Batch Operations

Batch operations allow you to perform multiple ShotGrid operations in a single API call. This significantly reduces network overhead and improves performance, especially when creating, updating, or deleting multiple entities.

Why Use Batch Operations?

Batch operations provide several benefits:

  1. Reduced Network Overhead: One API call instead of many.
  2. Improved Performance: Faster execution of multiple operations.
  3. Atomic Transactions: All operations succeed or fail together.
  4. Simplified Error Handling: Handle errors for multiple operations in one place.
  5. Reduced Server Load: Less strain on the ShotGrid server.

Basic Batch Operations

Batch Creation

Create multiple entities in a single API call:

@server.tool()
def batch_create_shots(
    project_id: int,
    sequence_id: int,
    shot_codes: list
) -> list:
    """
    Create multiple shots in a single batch operation.
    
    Args:
        project_id: ID of the project
        sequence_id: ID of the sequence
        shot_codes: List of shot codes to create
    
    Returns:
        List of created shots
    """
    batch_data = []
    
    for code in shot_codes:
        batch_data.append({
            "request_type": "create",
            "entity_type": "Shot",
            "data": {
                "code": code,
                "project": {"type": "Project", "id": project_id},
                "sg_sequence": {"type": "Sequence", "id": sequence_id},
                "sg_status_list": "ip"  # Default to In Progress
            }
        })
    
    return server.connection.batch(batch_data)

Batch Updates

Update multiple entities in a single API call:

@server.tool()
def batch_update_task_status(
    task_ids: list,
    status: str
) -> list:
    """
    Update the status of multiple tasks in a single batch operation.
    
    Args:
        task_ids: List of task IDs to update
        status: New status to set
    
    Returns:
        List of updated tasks
    """
    batch_data = []
    
    for task_id in task_ids:
        batch_data.append({
            "request_type": "update",
            "entity_type": "Task",
            "entity_id": task_id,
            "data": {
                "sg_status_list": status
            }
        })
    
    return server.connection.batch(batch_data)

Batch Deletion

Delete multiple entities in a single API call:

@server.tool()
def batch_delete_notes(note_ids: list) -> list:
    """
    Delete multiple notes in a single batch operation.
    
    Args:
        note_ids: List of note IDs to delete
    
    Returns:
        List of results (True for each successful deletion)
    """
    batch_data = []
    
    for note_id in note_ids:
        batch_data.append({
            "request_type": "delete",
            "entity_type": "Note",
            "entity_id": note_id
        })
    
    return server.connection.batch(batch_data)

Mixed Batch Operations

Perform different types of operations in a single batch:

@server.tool()
def batch_operations(operations: list) -> list:
    """
    Perform multiple operations in a single batch call.
    
    Args:
        operations: List of operation dictionaries, each with:
                   - request_type: "create", "update", or "delete"
                   - entity_type: Type of entity
                   - entity_id: ID for update/delete operations
                   - data: Data for create/update operations
    
    Returns:
        List of results for each operation
    """
    return server.connection.batch(operations)

Example usage:

# Example mixed batch operations
operations = [
    # Create a sequence
    {
        "request_type": "create",
        "entity_type": "Sequence",
        "data": {
            "code": "SEQ001",
            "project": {"type": "Project", "id": 123}
        }
    },
    # Update a shot
    {
        "request_type": "update",
        "entity_type": "Shot",
        "entity_id": 456,
        "data": {
            "sg_status_list": "cmpt"
        }
    },
    # Delete a note
    {
        "request_type": "delete",
        "entity_type": "Note",
        "entity_id": 789
    }
]

# Execute all operations in a single API call
results = batch_operations(operations)

Advanced Batch Patterns

Create a hierarchy of related entities in a single batch:

@server.tool()
def create_sequence_with_shots(
    project_id: int,
    sequence_code: str,
    shot_count: int
) -> dict:
    """
    Create a sequence and its shots in a single batch operation.
    
    Args:
        project_id: ID of the project
        sequence_code: Code for the new sequence
        shot_count: Number of shots to create
    
    Returns:
        Dictionary with the created sequence and shots
    """
    batch_data = [
        # First, create the sequence
        {
            "request_type": "create",
            "entity_type": "Sequence",
            "data": {
                "code": sequence_code,
                "project": {"type": "Project", "id": project_id}
            }
        }
    ]
    
    # The batch results will be returned in order, so the first result
    # will be the sequence we just created
    
    # Now add operations to create shots
    # We'll use a placeholder for the sequence ID that will be replaced
    # with the actual ID after the batch operation
    for i in range(1, shot_count + 1):
        batch_data.append({
            "request_type": "create",
            "entity_type": "Shot",
            "data": {
                "code": f"{sequence_code}_{i:03d}",
                "project": {"type": "Project", "id": project_id},
                # We'll update this after getting the sequence ID
                "sg_sequence": {"type": "Sequence", "id": None}
            }
        })
    
    # Execute the batch operation
    results = server.connection.batch(batch_data)
    
    # Get the sequence from the first result
    sequence = results[0]
    
    # Update the shots with the correct sequence ID
    shots = []
    for i in range(1, len(results)):
        shot = results[i]
        # Update the shot with the correct sequence ID
        server.connection.update(
            "Shot",
            shot["id"],
            {"sg_sequence": {"type": "Sequence", "id": sequence["id"]}}
        )
        shots.append(shot)
    
    return {
        "sequence": sequence,
        "shots": shots
    }

Batch with Validation

Validate data before performing batch operations:

@server.tool()
def batch_create_tasks_with_validation(
    entity_type: str,
    entity_id: int,
    task_data_list: list
) -> dict:
    """
    Create multiple tasks with validation.
    
    Args:
        entity_type: Type of entity to attach tasks to
        entity_id: ID of the entity
        task_data_list: List of task data dictionaries
    
    Returns:
        Dictionary with results and validation info
    """
    # Validate entity exists
    entity = server.connection.find_one(
        entity_type,
        [["id", "is", entity_id]],
        ["id", "code"]
    )
    
    if not entity:
        raise ValueError(f"{entity_type} with ID {entity_id} not found")
    
    # Validate task data
    valid_tasks = []
    invalid_tasks = []
    
    for i, task_data in enumerate(task_data_list):
        # Check required fields
        if "content" not in task_data:
            invalid_tasks.append({
                "index": i,
                "data": task_data,
                "error": "Missing required field: content"
            })
            continue
        
        # Check status if provided
        if "sg_status_list" in task_data:
            valid_statuses = ["wtg", "rdy", "ip", "cmpt", "fin"]
            if task_data["sg_status_list"] not in valid_statuses:
                invalid_tasks.append({
                    "index": i,
                    "data": task_data,
                    "error": f"Invalid status: {task_data['sg_status_list']}"
                })
                continue
        
        # Task is valid
        valid_tasks.append(task_data)
    
    # If there are invalid tasks, return without creating any
    if invalid_tasks:
        return {
            "success": False,
            "message": "Validation failed",
            "invalid_tasks": invalid_tasks
        }
    
    # Create batch data for valid tasks
    batch_data = []
    
    for task_data in valid_tasks:
        # Ensure entity is set
        task_data["entity"] = {"type": entity_type, "id": entity_id}
        
        batch_data.append({
            "request_type": "create",
            "entity_type": "Task",
            "data": task_data
        })
    
    # Execute batch operation
    results = server.connection.batch(batch_data)
    
    return {
        "success": True,
        "message": f"Created {len(results)} tasks",
        "tasks": results
    }

Batch with Retry Logic

Implement retry logic for batch operations:

import time
from shotgun_api3.shotgun import ShotgunError

@server.tool()
def batch_with_retry(
    operations: list,
    max_retries: int = 3,
    retry_delay: float = 1.0
) -> dict:
    """
    Perform batch operations with retry logic.
    
    Args:
        operations: List of operation dictionaries
        max_retries: Maximum number of retry attempts
        retry_delay: Delay between retries in seconds
    
    Returns:
        Dictionary with results and retry info
    """
    retries = 0
    last_error = None
    
    while retries <= max_retries:
        try:
            # Attempt the batch operation
            results = server.connection.batch(operations)
            
            # If successful, return the results
            return {
                "success": True,
                "results": results,
                "retries": retries
            }
        except ShotgunError as e:
            # Store the error
            last_error = str(e)
            
            # Increment retry counter
            retries += 1
            
            if retries <= max_retries:
                # Wait before retrying
                time.sleep(retry_delay)
                # Increase delay for next retry (exponential backoff)
                retry_delay *= 2
            else:
                # Max retries reached, give up
                break
    
    # If we get here, all retries failed
    return {
        "success": False,
        "error": last_error,
        "retries": retries
    }

Chunking Large Batches

For very large batches, split them into smaller chunks:

@server.tool()
def batch_create_chunked(
    entity_type: str,
    data_list: list,
    chunk_size: int = 50
) -> dict:
    """
    Create entities in chunks to handle large batches.
    
    Args:
        entity_type: Type of entities to create
        data_list: List of data dictionaries
        chunk_size: Maximum number of operations per batch
    
    Returns:
        Dictionary with results from all chunks
    """
    all_results = []
    chunk_count = 0
    
    # Split the data list into chunks
    for i in range(0, len(data_list), chunk_size):
        chunk = data_list[i:i + chunk_size]
        chunk_count += 1
        
        # Create batch data for this chunk
        batch_data = []
        for data in chunk:
            batch_data.append({
                "request_type": "create",
                "entity_type": entity_type,
                "data": data
            })
        
        # Execute batch for this chunk
        chunk_results = server.connection.batch(batch_data)
        all_results.extend(chunk_results)
    
    return {
        "success": True,
        "total_entities": len(all_results),
        "chunk_count": chunk_count,
        "results": all_results
    }

Asynchronous Batch Processing

For very large operations, use asynchronous processing:

@server.tool()
async def batch_process_async(
    operations: list,
    chunk_size: int = 50
) -> dict:
    """
    Process batch operations asynchronously.
    
    Args:
        operations: List of operation dictionaries
        chunk_size: Maximum number of operations per batch
    
    Returns:
        Dictionary with results from all chunks
    """
    import asyncio
    
    all_results = []
    chunk_count = 0
    
    # Split operations into chunks
    chunks = [operations[i:i + chunk_size] for i in range(0, len(operations), chunk_size)]
    chunk_count = len(chunks)
    
    # Process each chunk
    async def process_chunk(chunk):
        # Get a connection from the pool
        async with server.connection_pool.connection() as sg:
            return sg.batch(chunk)
    
    # Create tasks for all chunks
    tasks = [process_chunk(chunk) for chunk in chunks]
    
    # Execute all chunks concurrently
    chunk_results = await asyncio.gather(*tasks)
    
    # Combine results from all chunks
    for results in chunk_results:
        all_results.extend(results)
    
    return {
        "success": True,
        "total_operations": len(all_results),
        "chunk_count": chunk_count,
        "results": all_results
    }

Best Practices

  1. Batch Size: Keep batch sizes reasonable (50-100 operations per batch).
  2. Validation: Validate data before performing batch operations.
  3. Error Handling: Implement proper error handling for batch operations.
  4. Chunking: Split large batches into smaller chunks.
  5. Retry Logic: Implement retry logic for transient errors.
  6. Transactions: Remember that ShotGrid batch operations are not fully transactional - some operations may succeed while others fail.
  7. Logging: Log batch operations for debugging and auditing.
  8. Testing: Test batch operations thoroughly, especially with edge cases.
  9. Monitoring: Monitor batch operation performance and adjust chunk sizes as needed.
  10. Async Processing: Use asynchronous processing for very large batches.

Complete Example

Here’s a complete example that demonstrates many batch operation patterns:

@server.tool()
async def create_production_setup(
    project_name: str,
    sequence_count: int = 3,
    shots_per_sequence: int = 5,
    task_templates: list = None
) -> dict:
    """
    Create a complete production setup with project, sequences, shots, and tasks.
    
    Args:
        project_name: Name of the project to create
        sequence_count: Number of sequences to create
        shots_per_sequence: Number of shots per sequence
        task_templates: List of task templates to apply to shots
    
    Returns:
        Dictionary with created entities
    """
    if task_templates is None:
        task_templates = [
            {"content": "Animation", "sg_status_list": "rdy"},
            {"content": "Lighting", "sg_status_list": "wtg"},
            {"content": "Compositing", "sg_status_list": "wtg"}
        ]
    
    # Create project
    project = server.connection.create("Project", {
        "name": project_name,
        "code": "".join(word[0] for word in project_name.split()).upper()
    })
    
    # Create sequences
    sequence_batch = []
    for i in range(1, sequence_count + 1):
        sequence_batch.append({
            "request_type": "create",
            "entity_type": "Sequence",
            "data": {
                "code": f"SEQ{i:03d}",
                "project": {"type": "Project", "id": project["id"]}
            }
        })
    
    sequences = server.connection.batch(sequence_batch)
    
    # Create shots for each sequence
    all_shots = []
    for sequence in sequences:
        shot_batch = []
        for j in range(1, shots_per_sequence + 1):
            shot_batch.append({
                "request_type": "create",
                "entity_type": "Shot",
                "data": {
                    "code": f"{sequence['code']}_{j:03d}",
                    "project": {"type": "Project", "id": project["id"]},
                    "sg_sequence": {"type": "Sequence", "id": sequence["id"]},
                    "sg_status_list": "ip"
                }
            })
        
        shots = server.connection.batch(shot_batch)
        all_shots.extend(shots)
    
    # Create tasks for each shot
    all_tasks = []
    
    # Process shots in chunks to avoid too large batches
    shot_chunks = [all_shots[i:i + 10] for i in range(0, len(all_shots), 10)]
    
    # Process each chunk asynchronously
    async def process_shot_chunk(shots):
        task_batch = []
        for shot in shots:
            for template in task_templates:
                task_batch.append({
                    "request_type": "create",
                    "entity_type": "Task",
                    "data": {
                        "content": template["content"],
                        "sg_status_list": template["sg_status_list"],
                        "entity": {"type": "Shot", "id": shot["id"]},
                        "project": {"type": "Project", "id": project["id"]}
                    }
                })
        
        # Get a connection from the pool
        async with server.connection_pool.connection() as sg:
            return sg.batch(task_batch)
    
    # Create tasks for all chunks concurrently
    import asyncio
    tasks = [process_shot_chunk(chunk) for chunk in shot_chunks]
    chunk_results = await asyncio.gather(*tasks)
    
    # Combine results from all chunks
    for results in chunk_results:
        all_tasks.extend(results)
    
    return {
        "project": project,
        "sequences": sequences,
        "shots": all_shots,
        "tasks": all_tasks,
        "summary": {
            "project_count": 1,
            "sequence_count": len(sequences),
            "shot_count": len(all_shots),
            "task_count": len(all_tasks)
        }
    }

Next Steps

Now that you understand batch operations, you can: