ShotGrid MCP Server provides several techniques for optimizing queries to the ShotGrid API. This page covers best practices and patterns for writing efficient queries that minimize API calls and improve performance.
Field hopping (also known as dot notation) allows you to retrieve data from related entities in a single query, reducing the number of API calls needed.
@server.tool()def find_shots_with_sequence(project_id: int) -> list: """Find shots with their sequence information.""" return server.connection.find( "Shot", [["project", "is", {"type": "Project", "id": project_id}]], [ "code", "sg_status_list", # Field hopping to get sequence data "sg_sequence.Sequence.code", "sg_sequence.Sequence.description" ] )
This query retrieves shots and their related sequence information in a single API call, rather than requiring separate calls to fetch the sequence data.
Here’s a complete example of a tool that uses field hopping to efficiently retrieve related data:
Copy
@server.tool()def search_entities_with_related( entity_type: str, filters: list, fields: list, related_fields: dict = None) -> list: """ Search for entities with related fields in a single query. Args: entity_type: The entity type to search for filters: List of filters to apply fields: List of fields to retrieve related_fields: Dictionary mapping related entity fields to their fields Example: {"project": ["name", "code"], "sg_sequence": ["code"]} Returns: List of entities with related fields """ # Start with the basic fields all_fields = list(fields) # Add related fields using dot notation if related_fields: for relation, rel_fields in related_fields.items(): # Get the entity type for the relation if relation == "project": related_type = "Project" elif relation == "sg_sequence": related_type = "Sequence" elif relation == "entity": # For tasks, the entity could be any type # We'll assume Shot for this example related_type = "Shot" elif relation == "task_assignees": related_type = "HumanUser" else: # For other relations, we need to determine the type # This is a simplified approach related_type = relation.capitalize() # Add each related field with dot notation for field in rel_fields: all_fields.append(f"{relation}.{related_type}.{field}") # Execute the query with all fields return server.connection.find( entity_type, filters, all_fields )
@server.tool()def batch_create_entities( entity_type: str, data_list: list) -> list: """ Create multiple entities in a single batch operation. Args: entity_type: The type of entities to create data_list: List of data dictionaries for each entity Returns: List of created entities """ batch_data = [] for data in data_list: batch_data.append({ "request_type": "create", "entity_type": entity_type, "data": data }) return server.connection.batch(batch_data)
Perform different types of operations in a single call:
Copy
@server.tool()def batch_operations(operations: list) -> list: """ Perform multiple operations in a single batch call. Args: operations: List of operation dictionaries, each with: - request_type: "create", "update", or "delete" - entity_type: Type of entity - entity_id: ID for update/delete operations - data: Data for create/update operations Returns: List of results for each operation """ return server.connection.batch(operations)
Example usage:
Copy
# Example batch operationsoperations = [ # Create a sequence { "request_type": "create", "entity_type": "Sequence", "data": { "code": "SEQ001", "project": {"type": "Project", "id": 123} } }, # Update a shot { "request_type": "update", "entity_type": "Shot", "entity_id": 456, "data": { "sg_status_list": "cmpt" } }, # Delete a note { "request_type": "delete", "entity_type": "Note", "entity_id": 789 }]# Execute all operations in a single API callresults = batch_operations(operations)
@server.tool()def find_shot_status(shot_id: int) -> dict: """Get just the status of a shot.""" shot = server.connection.find_one( "Shot", [["id", "is", shot_id]], ["code", "sg_status_list"] # Only request necessary fields ) if not shot: raise ValueError(f"Shot with ID {shot_id} not found") return { "id": shot["id"], "code": shot["code"], "status": shot["sg_status_list"] }
import timefrom functools import lru_cacheclass ShotGridCache: def __init__(self, server, ttl=300): # 5 minutes TTL self.server = server self.ttl = ttl self.cache = {} self.timestamps = {} def get(self, key): """Get a value from the cache if it exists and is not expired.""" if key in self.cache: # Check if the cache entry has expired if time.time() - self.timestamps[key] < self.ttl: return self.cache[key] else: # Remove expired entry del self.cache[key] del self.timestamps[key] return None def set(self, key, value): """Set a value in the cache with the current timestamp.""" self.cache[key] = value self.timestamps[key] = time.time() def clear(self): """Clear the entire cache.""" self.cache.clear() self.timestamps.clear()# Create a cache instancecache = ShotGridCache(server)@server.tool()def get_project_with_cache(project_id: int) -> dict: """Get a project, using cache if available.""" cache_key = f"project_{project_id}" # Try to get from cache first cached_project = cache.get(cache_key) if cached_project: return cached_project # Not in cache, fetch from ShotGrid project = server.connection.find_one( "Project", [["id", "is", project_id]], ["id", "name", "code", "sg_status"] ) if project: # Store in cache for future use cache.set(cache_key, project) return project
For simpler caching needs, you can use Python’s built-in lru_cache:
Copy
from functools import lru_cache@lru_cache(maxsize=100)def get_entity_schema(entity_type): """Get entity schema with caching.""" schema = server.schema_loader.get_schema() return schema.get(entity_type, {})@server.tool()def validate_entity_field(entity_type: str, field_name: str) -> bool: """Check if a field exists for an entity type (cached).""" entity_schema = get_entity_schema(entity_type) return field_name in entity_schema