Batch Operations
Efficiently perform multiple operations in a single API call
Batch Operations
Batch operations allow you to perform multiple ShotGrid operations in a single API call. This significantly reduces network overhead and improves performance, especially when creating, updating, or deleting multiple entities.
Why Use Batch Operations?
Batch operations provide several benefits:
- Reduced Network Overhead: One API call instead of many.
- Improved Performance: Faster execution of multiple operations.
- Atomic Transactions: All operations succeed or fail together.
- Simplified Error Handling: Handle errors for multiple operations in one place.
- Reduced Server Load: Less strain on the ShotGrid server.
Basic Batch Operations
Batch Creation
Create multiple entities in a single API call:
@server.tool()
def batch_create_shots(
project_id: int,
sequence_id: int,
shot_codes: list
) -> list:
"""
Create multiple shots in a single batch operation.
Args:
project_id: ID of the project
sequence_id: ID of the sequence
shot_codes: List of shot codes to create
Returns:
List of created shots
"""
batch_data = []
for code in shot_codes:
batch_data.append({
"request_type": "create",
"entity_type": "Shot",
"data": {
"code": code,
"project": {"type": "Project", "id": project_id},
"sg_sequence": {"type": "Sequence", "id": sequence_id},
"sg_status_list": "ip" # Default to In Progress
}
})
return server.connection.batch(batch_data)
Batch Updates
Update multiple entities in a single API call:
@server.tool()
def batch_update_task_status(
task_ids: list,
status: str
) -> list:
"""
Update the status of multiple tasks in a single batch operation.
Args:
task_ids: List of task IDs to update
status: New status to set
Returns:
List of updated tasks
"""
batch_data = []
for task_id in task_ids:
batch_data.append({
"request_type": "update",
"entity_type": "Task",
"entity_id": task_id,
"data": {
"sg_status_list": status
}
})
return server.connection.batch(batch_data)
Batch Deletion
Delete multiple entities in a single API call:
@server.tool()
def batch_delete_notes(note_ids: list) -> list:
"""
Delete multiple notes in a single batch operation.
Args:
note_ids: List of note IDs to delete
Returns:
List of results (True for each successful deletion)
"""
batch_data = []
for note_id in note_ids:
batch_data.append({
"request_type": "delete",
"entity_type": "Note",
"entity_id": note_id
})
return server.connection.batch(batch_data)
Mixed Batch Operations
Perform different types of operations in a single batch:
@server.tool()
def batch_operations(operations: list) -> list:
"""
Perform multiple operations in a single batch call.
Args:
operations: List of operation dictionaries, each with:
- request_type: "create", "update", or "delete"
- entity_type: Type of entity
- entity_id: ID for update/delete operations
- data: Data for create/update operations
Returns:
List of results for each operation
"""
return server.connection.batch(operations)
Example usage:
# Example mixed batch operations
operations = [
# Create a sequence
{
"request_type": "create",
"entity_type": "Sequence",
"data": {
"code": "SEQ001",
"project": {"type": "Project", "id": 123}
}
},
# Update a shot
{
"request_type": "update",
"entity_type": "Shot",
"entity_id": 456,
"data": {
"sg_status_list": "cmpt"
}
},
# Delete a note
{
"request_type": "delete",
"entity_type": "Note",
"entity_id": 789
}
]
# Execute all operations in a single API call
results = batch_operations(operations)
Advanced Batch Patterns
Creating Related Entities
Create a hierarchy of related entities in a single batch:
@server.tool()
def create_sequence_with_shots(
project_id: int,
sequence_code: str,
shot_count: int
) -> dict:
"""
Create a sequence and its shots in a single batch operation.
Args:
project_id: ID of the project
sequence_code: Code for the new sequence
shot_count: Number of shots to create
Returns:
Dictionary with the created sequence and shots
"""
batch_data = [
# First, create the sequence
{
"request_type": "create",
"entity_type": "Sequence",
"data": {
"code": sequence_code,
"project": {"type": "Project", "id": project_id}
}
}
]
# The batch results will be returned in order, so the first result
# will be the sequence we just created
# Now add operations to create shots
# We'll use a placeholder for the sequence ID that will be replaced
# with the actual ID after the batch operation
for i in range(1, shot_count + 1):
batch_data.append({
"request_type": "create",
"entity_type": "Shot",
"data": {
"code": f"{sequence_code}_{i:03d}",
"project": {"type": "Project", "id": project_id},
# We'll update this after getting the sequence ID
"sg_sequence": {"type": "Sequence", "id": None}
}
})
# Execute the batch operation
results = server.connection.batch(batch_data)
# Get the sequence from the first result
sequence = results[0]
# Update the shots with the correct sequence ID
shots = []
for i in range(1, len(results)):
shot = results[i]
# Update the shot with the correct sequence ID
server.connection.update(
"Shot",
shot["id"],
{"sg_sequence": {"type": "Sequence", "id": sequence["id"]}}
)
shots.append(shot)
return {
"sequence": sequence,
"shots": shots
}
Batch with Validation
Validate data before performing batch operations:
@server.tool()
def batch_create_tasks_with_validation(
entity_type: str,
entity_id: int,
task_data_list: list
) -> dict:
"""
Create multiple tasks with validation.
Args:
entity_type: Type of entity to attach tasks to
entity_id: ID of the entity
task_data_list: List of task data dictionaries
Returns:
Dictionary with results and validation info
"""
# Validate entity exists
entity = server.connection.find_one(
entity_type,
[["id", "is", entity_id]],
["id", "code"]
)
if not entity:
raise ValueError(f"{entity_type} with ID {entity_id} not found")
# Validate task data
valid_tasks = []
invalid_tasks = []
for i, task_data in enumerate(task_data_list):
# Check required fields
if "content" not in task_data:
invalid_tasks.append({
"index": i,
"data": task_data,
"error": "Missing required field: content"
})
continue
# Check status if provided
if "sg_status_list" in task_data:
valid_statuses = ["wtg", "rdy", "ip", "cmpt", "fin"]
if task_data["sg_status_list"] not in valid_statuses:
invalid_tasks.append({
"index": i,
"data": task_data,
"error": f"Invalid status: {task_data['sg_status_list']}"
})
continue
# Task is valid
valid_tasks.append(task_data)
# If there are invalid tasks, return without creating any
if invalid_tasks:
return {
"success": False,
"message": "Validation failed",
"invalid_tasks": invalid_tasks
}
# Create batch data for valid tasks
batch_data = []
for task_data in valid_tasks:
# Ensure entity is set
task_data["entity"] = {"type": entity_type, "id": entity_id}
batch_data.append({
"request_type": "create",
"entity_type": "Task",
"data": task_data
})
# Execute batch operation
results = server.connection.batch(batch_data)
return {
"success": True,
"message": f"Created {len(results)} tasks",
"tasks": results
}
Batch with Retry Logic
Implement retry logic for batch operations:
import time
from shotgun_api3.shotgun import ShotgunError
@server.tool()
def batch_with_retry(
operations: list,
max_retries: int = 3,
retry_delay: float = 1.0
) -> dict:
"""
Perform batch operations with retry logic.
Args:
operations: List of operation dictionaries
max_retries: Maximum number of retry attempts
retry_delay: Delay between retries in seconds
Returns:
Dictionary with results and retry info
"""
retries = 0
last_error = None
while retries <= max_retries:
try:
# Attempt the batch operation
results = server.connection.batch(operations)
# If successful, return the results
return {
"success": True,
"results": results,
"retries": retries
}
except ShotgunError as e:
# Store the error
last_error = str(e)
# Increment retry counter
retries += 1
if retries <= max_retries:
# Wait before retrying
time.sleep(retry_delay)
# Increase delay for next retry (exponential backoff)
retry_delay *= 2
else:
# Max retries reached, give up
break
# If we get here, all retries failed
return {
"success": False,
"error": last_error,
"retries": retries
}
Chunking Large Batches
For very large batches, split them into smaller chunks:
@server.tool()
def batch_create_chunked(
entity_type: str,
data_list: list,
chunk_size: int = 50
) -> dict:
"""
Create entities in chunks to handle large batches.
Args:
entity_type: Type of entities to create
data_list: List of data dictionaries
chunk_size: Maximum number of operations per batch
Returns:
Dictionary with results from all chunks
"""
all_results = []
chunk_count = 0
# Split the data list into chunks
for i in range(0, len(data_list), chunk_size):
chunk = data_list[i:i + chunk_size]
chunk_count += 1
# Create batch data for this chunk
batch_data = []
for data in chunk:
batch_data.append({
"request_type": "create",
"entity_type": entity_type,
"data": data
})
# Execute batch for this chunk
chunk_results = server.connection.batch(batch_data)
all_results.extend(chunk_results)
return {
"success": True,
"total_entities": len(all_results),
"chunk_count": chunk_count,
"results": all_results
}
Asynchronous Batch Processing
For very large operations, use asynchronous processing:
@server.tool()
async def batch_process_async(
operations: list,
chunk_size: int = 50
) -> dict:
"""
Process batch operations asynchronously.
Args:
operations: List of operation dictionaries
chunk_size: Maximum number of operations per batch
Returns:
Dictionary with results from all chunks
"""
import asyncio
all_results = []
chunk_count = 0
# Split operations into chunks
chunks = [operations[i:i + chunk_size] for i in range(0, len(operations), chunk_size)]
chunk_count = len(chunks)
# Process each chunk
async def process_chunk(chunk):
# Get a connection from the pool
async with server.connection_pool.connection() as sg:
return sg.batch(chunk)
# Create tasks for all chunks
tasks = [process_chunk(chunk) for chunk in chunks]
# Execute all chunks concurrently
chunk_results = await asyncio.gather(*tasks)
# Combine results from all chunks
for results in chunk_results:
all_results.extend(results)
return {
"success": True,
"total_operations": len(all_results),
"chunk_count": chunk_count,
"results": all_results
}
Best Practices
- Batch Size: Keep batch sizes reasonable (50-100 operations per batch).
- Validation: Validate data before performing batch operations.
- Error Handling: Implement proper error handling for batch operations.
- Chunking: Split large batches into smaller chunks.
- Retry Logic: Implement retry logic for transient errors.
- Transactions: Remember that ShotGrid batch operations are not fully transactional - some operations may succeed while others fail.
- Logging: Log batch operations for debugging and auditing.
- Testing: Test batch operations thoroughly, especially with edge cases.
- Monitoring: Monitor batch operation performance and adjust chunk sizes as needed.
- Async Processing: Use asynchronous processing for very large batches.
Complete Example
Here’s a complete example that demonstrates many batch operation patterns:
@server.tool()
async def create_production_setup(
project_name: str,
sequence_count: int = 3,
shots_per_sequence: int = 5,
task_templates: list = None
) -> dict:
"""
Create a complete production setup with project, sequences, shots, and tasks.
Args:
project_name: Name of the project to create
sequence_count: Number of sequences to create
shots_per_sequence: Number of shots per sequence
task_templates: List of task templates to apply to shots
Returns:
Dictionary with created entities
"""
if task_templates is None:
task_templates = [
{"content": "Animation", "sg_status_list": "rdy"},
{"content": "Lighting", "sg_status_list": "wtg"},
{"content": "Compositing", "sg_status_list": "wtg"}
]
# Create project
project = server.connection.create("Project", {
"name": project_name,
"code": "".join(word[0] for word in project_name.split()).upper()
})
# Create sequences
sequence_batch = []
for i in range(1, sequence_count + 1):
sequence_batch.append({
"request_type": "create",
"entity_type": "Sequence",
"data": {
"code": f"SEQ{i:03d}",
"project": {"type": "Project", "id": project["id"]}
}
})
sequences = server.connection.batch(sequence_batch)
# Create shots for each sequence
all_shots = []
for sequence in sequences:
shot_batch = []
for j in range(1, shots_per_sequence + 1):
shot_batch.append({
"request_type": "create",
"entity_type": "Shot",
"data": {
"code": f"{sequence['code']}_{j:03d}",
"project": {"type": "Project", "id": project["id"]},
"sg_sequence": {"type": "Sequence", "id": sequence["id"]},
"sg_status_list": "ip"
}
})
shots = server.connection.batch(shot_batch)
all_shots.extend(shots)
# Create tasks for each shot
all_tasks = []
# Process shots in chunks to avoid too large batches
shot_chunks = [all_shots[i:i + 10] for i in range(0, len(all_shots), 10)]
# Process each chunk asynchronously
async def process_shot_chunk(shots):
task_batch = []
for shot in shots:
for template in task_templates:
task_batch.append({
"request_type": "create",
"entity_type": "Task",
"data": {
"content": template["content"],
"sg_status_list": template["sg_status_list"],
"entity": {"type": "Shot", "id": shot["id"]},
"project": {"type": "Project", "id": project["id"]}
}
})
# Get a connection from the pool
async with server.connection_pool.connection() as sg:
return sg.batch(task_batch)
# Create tasks for all chunks concurrently
import asyncio
tasks = [process_shot_chunk(chunk) for chunk in shot_chunks]
chunk_results = await asyncio.gather(*tasks)
# Combine results from all chunks
for results in chunk_results:
all_tasks.extend(results)
return {
"project": project,
"sequences": sequences,
"shots": all_shots,
"tasks": all_tasks,
"summary": {
"project_count": 1,
"sequence_count": len(sequences),
"shot_count": len(all_shots),
"task_count": len(all_tasks)
}
}
Next Steps
Now that you understand batch operations, you can:
- Learn about optimized queries for better performance
- Explore error handling patterns for robust applications
Was this page helpful?
- Batch Operations
- Why Use Batch Operations?
- Basic Batch Operations
- Batch Creation
- Batch Updates
- Batch Deletion
- Mixed Batch Operations
- Advanced Batch Patterns
- Creating Related Entities
- Batch with Validation
- Batch with Retry Logic
- Chunking Large Batches
- Asynchronous Batch Processing
- Best Practices
- Complete Example
- Next Steps