Optimized Queries

ShotGrid MCP Server provides several techniques for optimizing queries to the ShotGrid API. This page covers best practices and patterns for writing efficient queries that minimize API calls and improve performance.

Field Hopping

Field hopping (also known as dot notation) allows you to retrieve data from related entities in a single query, reducing the number of API calls needed.

Basic Field Hopping

@server.tool()
def find_shots_with_sequence(project_id: int) -> list:
    """Find shots with their sequence information."""
    return server.connection.find(
        "Shot",
        [["project", "is", {"type": "Project", "id": project_id}]],
        [
            "code",
            "sg_status_list",
            # Field hopping to get sequence data
            "sg_sequence.Sequence.code",
            "sg_sequence.Sequence.description"
        ]
    )

This query retrieves shots and their related sequence information in a single API call, rather than requiring separate calls to fetch the sequence data.

Multi-level Field Hopping

You can hop through multiple levels of relationships:

@server.tool()
def find_tasks_with_details(project_id: int) -> list:
    """Find tasks with details about their entity and assignees."""
    return server.connection.find(
        "Task",
        [["project", "is", {"type": "Project", "id": project_id}]],
        [
            "content",
            "sg_status_list",
            # Entity information
            "entity.Shot.code",
            "entity.Shot.sg_status_list",
            # Sequence information (through the shot)
            "entity.Shot.sg_sequence.Sequence.code",
            # Assignee information
            "task_assignees.HumanUser.name",
            "task_assignees.HumanUser.email"
        ]
    )

Field Hopping in Filters

Field hopping can also be used in filters to create more precise queries:

@server.tool()
def find_shots_by_sequence_name(project_id: int, sequence_name: str) -> list:
    """Find shots in a specific sequence by name."""
    return server.connection.find(
        "Shot",
        [
            ["project", "is", {"type": "Project", "id": project_id}],
            # Field hopping in filter
            ["sg_sequence.Sequence.code", "is", sequence_name]
        ],
        ["code", "sg_status_list"]
    )

Implementing a Field Hopping Tool

Here’s a complete example of a tool that uses field hopping to efficiently retrieve related data:

@server.tool()
def search_entities_with_related(
    entity_type: str,
    filters: list,
    fields: list,
    related_fields: dict = None
) -> list:
    """
    Search for entities with related fields in a single query.
    
    Args:
        entity_type: The entity type to search for
        filters: List of filters to apply
        fields: List of fields to retrieve
        related_fields: Dictionary mapping related entity fields to their fields
                        Example: {"project": ["name", "code"], "sg_sequence": ["code"]}
    
    Returns:
        List of entities with related fields
    """
    # Start with the basic fields
    all_fields = list(fields)
    
    # Add related fields using dot notation
    if related_fields:
        for relation, rel_fields in related_fields.items():
            # Get the entity type for the relation
            if relation == "project":
                related_type = "Project"
            elif relation == "sg_sequence":
                related_type = "Sequence"
            elif relation == "entity":
                # For tasks, the entity could be any type
                # We'll assume Shot for this example
                related_type = "Shot"
            elif relation == "task_assignees":
                related_type = "HumanUser"
            else:
                # For other relations, we need to determine the type
                # This is a simplified approach
                related_type = relation.capitalize()
            
            # Add each related field with dot notation
            for field in rel_fields:
                all_fields.append(f"{relation}.{related_type}.{field}")
    
    # Execute the query with all fields
    return server.connection.find(
        entity_type,
        filters,
        all_fields
    )

Batch Operations

Batch operations allow you to perform multiple operations in a single API call, significantly reducing the number of network round-trips.

Batch Creation

Create multiple entities in a single call:

@server.tool()
def batch_create_entities(
    entity_type: str,
    data_list: list
) -> list:
    """
    Create multiple entities in a single batch operation.
    
    Args:
        entity_type: The type of entities to create
        data_list: List of data dictionaries for each entity
    
    Returns:
        List of created entities
    """
    batch_data = []
    
    for data in data_list:
        batch_data.append({
            "request_type": "create",
            "entity_type": entity_type,
            "data": data
        })
    
    return server.connection.batch(batch_data)

Mixed Batch Operations

Perform different types of operations in a single call:

@server.tool()
def batch_operations(operations: list) -> list:
    """
    Perform multiple operations in a single batch call.
    
    Args:
        operations: List of operation dictionaries, each with:
                   - request_type: "create", "update", or "delete"
                   - entity_type: Type of entity
                   - entity_id: ID for update/delete operations
                   - data: Data for create/update operations
    
    Returns:
        List of results for each operation
    """
    return server.connection.batch(operations)

Example usage:

# Example batch operations
operations = [
    # Create a sequence
    {
        "request_type": "create",
        "entity_type": "Sequence",
        "data": {
            "code": "SEQ001",
            "project": {"type": "Project", "id": 123}
        }
    },
    # Update a shot
    {
        "request_type": "update",
        "entity_type": "Shot",
        "entity_id": 456,
        "data": {
            "sg_status_list": "cmpt"
        }
    },
    # Delete a note
    {
        "request_type": "delete",
        "entity_type": "Note",
        "entity_id": 789
    }
]

# Execute all operations in a single API call
results = batch_operations(operations)

Efficient Filtering

Proper filtering is crucial for performance, especially when dealing with large datasets.

Compound Filters

Use compound filters to narrow down results efficiently:

@server.tool()
def find_recent_tasks(
    project_id: int,
    days: int = 7,
    status: str = None
) -> list:
    """Find tasks updated in the last N days."""
    import datetime
    
    # Calculate the date threshold
    threshold = datetime.datetime.now() - datetime.timedelta(days=days)
    
    # Build filters
    filters = [
        ["project", "is", {"type": "Project", "id": project_id}],
        ["updated_at", "greater_than", threshold]
    ]
    
    if status:
        filters.append(["sg_status_list", "is", status])
    
    return server.connection.find(
        "Task",
        filters,
        ["content", "sg_status_list", "updated_at", "task_assignees"]
    )

Using in Filters

The in filter operator is more efficient than multiple separate queries:

@server.tool()
def find_entities_by_ids(
    entity_type: str,
    entity_ids: list
) -> list:
    """Find multiple entities by their IDs."""
    return server.connection.find(
        entity_type,
        [["id", "in", entity_ids]],
        ["id", "name", "code"]
    )

Pagination

For large result sets, use pagination to retrieve data in manageable chunks:

@server.tool()
def find_entities_paged(
    entity_type: str,
    filters: list,
    fields: list,
    page: int = 1,
    page_size: int = 50
) -> dict:
    """
    Find entities with pagination.
    
    Args:
        entity_type: The entity type to search for
        filters: List of filters to apply
        fields: List of fields to retrieve
        page: Page number (starting from 1)
        page_size: Number of results per page
    
    Returns:
        Dictionary with results and pagination info
    """
    # Calculate offset
    offset = (page - 1) * page_size
    
    # Get total count (without pagination)
    total_count = server.connection.find_one(
        entity_type,
        filters,
        fields=[],
        return_only="count"
    )
    
    # Get paginated results
    results = server.connection.find(
        entity_type,
        filters,
        fields,
        limit=page_size,
        offset=offset
    )
    
    # Calculate pagination info
    total_pages = (total_count + page_size - 1) // page_size
    has_next = page < total_pages
    has_prev = page > 1
    
    return {
        "results": results,
        "pagination": {
            "page": page,
            "page_size": page_size,
            "total_count": total_count,
            "total_pages": total_pages,
            "has_next": has_next,
            "has_prev": has_prev
        }
    }

Optimized Find One

When you only need a single entity, use find_one instead of find:

@server.tool()
def find_one_entity(
    entity_type: str,
    entity_id: int,
    fields: list = None
) -> dict:
    """Find a single entity by ID."""
    if fields is None:
        fields = ["id", "name", "code"]
    
    return server.connection.find_one(
        entity_type,
        [["id", "is", entity_id]],
        fields
    )

Selective Field Retrieval

Only request the fields you actually need:

@server.tool()
def find_shot_status(shot_id: int) -> dict:
    """Get just the status of a shot."""
    shot = server.connection.find_one(
        "Shot",
        [["id", "is", shot_id]],
        ["code", "sg_status_list"]  # Only request necessary fields
    )
    
    if not shot:
        raise ValueError(f"Shot with ID {shot_id} not found")
    
    return {
        "id": shot["id"],
        "code": shot["code"],
        "status": shot["sg_status_list"]
    }

Caching Strategies

Implement caching for frequently accessed data:

import time
from functools import lru_cache

class ShotGridCache:
    def __init__(self, server, ttl=300):  # 5 minutes TTL
        self.server = server
        self.ttl = ttl
        self.cache = {}
        self.timestamps = {}
    
    def get(self, key):
        """Get a value from the cache if it exists and is not expired."""
        if key in self.cache:
            # Check if the cache entry has expired
            if time.time() - self.timestamps[key] < self.ttl:
                return self.cache[key]
            else:
                # Remove expired entry
                del self.cache[key]
                del self.timestamps[key]
        return None
    
    def set(self, key, value):
        """Set a value in the cache with the current timestamp."""
        self.cache[key] = value
        self.timestamps[key] = time.time()
    
    def clear(self):
        """Clear the entire cache."""
        self.cache.clear()
        self.timestamps.clear()

# Create a cache instance
cache = ShotGridCache(server)

@server.tool()
def get_project_with_cache(project_id: int) -> dict:
    """Get a project, using cache if available."""
    cache_key = f"project_{project_id}"
    
    # Try to get from cache first
    cached_project = cache.get(cache_key)
    if cached_project:
        return cached_project
    
    # Not in cache, fetch from ShotGrid
    project = server.connection.find_one(
        "Project",
        [["id", "is", project_id]],
        ["id", "name", "code", "sg_status"]
    )
    
    if project:
        # Store in cache for future use
        cache.set(cache_key, project)
    
    return project

For simpler caching needs, you can use Python’s built-in lru_cache:

from functools import lru_cache

@lru_cache(maxsize=100)
def get_entity_schema(entity_type):
    """Get entity schema with caching."""
    schema = server.schema_loader.get_schema()
    return schema.get(entity_type, {})

@server.tool()
def validate_entity_field(entity_type: str, field_name: str) -> bool:
    """Check if a field exists for an entity type (cached)."""
    entity_schema = get_entity_schema(entity_type)
    return field_name in entity_schema

Best Practices Summary

  1. Use Field Hopping: Retrieve related entity data in a single query.
  2. Batch Operations: Combine multiple operations into a single API call.
  3. Efficient Filtering: Use compound filters and the in operator.
  4. Pagination: Use pagination for large result sets.
  5. Find One: Use find_one when you only need a single entity.
  6. Selective Fields: Only request the fields you actually need.
  7. Caching: Implement caching for frequently accessed data.
  8. Limit Results: Always use limits to avoid retrieving unnecessary data.
  9. Avoid N+1 Queries: Don’t make a separate query for each item in a list.
  10. Use Async Tools: For complex operations, use async tools to avoid blocking.

By following these patterns, you can significantly improve the performance of your ShotGrid MCP Server and provide a better experience for your users.