The Batch API allows you to run multiple deep research tasks in parallel with shared configuration and progress monitoring.
For conceptual overview, batch lifecycle, and best practices, see the Batch Processing Guide . This page focuses on Python SDK method reference.
Quick Start
from valyu import Valyu
client = Valyu()
# Create batch, add tasks, wait for completion
batch = client.batch.create( name = "Research Batch" , mode = "standard" )
client.batch.add_tasks(batch.batch_id, [
{ "query" : "Research AI trends" },
{ "query" : "Analyze market data" }
])
result = client.batch.wait_for_completion(batch.batch_id)
print ( f "Completed: { result.batch.counts.completed } tasks" )
Initialization
The Batch API is accessed through the batch property of your Valyu client:
from valyu import Valyu
client = Valyu( api_key = "your-api-key" )
batch_client = client.batch
Methods
create()
Create a new batch with default settings that will apply to all tasks.
Parameters:
Parameter Type Default Description namestrNoneOptional name for the batch modeLiteral["standard", "heavy", "fast"]"standard"Research mode (preferred): “standard” (default), “heavy” (comprehensive), or “fast” (faster completion). The lite mode has been replaced by fast. output_formatsList[Union[Literal["markdown", "pdf", "toon"], Dict]]NoneOutput formats: [“markdown”], [“pdf”], [“toon”], or a JSON schema object. Cannot mix JSON schema with “markdown”/“pdf”. “toon” requires a JSON schema. searchUnion[SearchConfig, Dict]NoneSearch configuration (type, sources, dates, category). See Search Configuration section for details. webhook_urlstrNoneHTTPS webhook URL for completion notification metadataDict[str, Union[str, int, bool]]NoneCustom metadata (key-value pairs)
Returns: BatchCreateResponse
Example:
batch = client.batch.create(
name = "Market Research Q4 2024" ,
mode = "standard" ,
output_formats = [ "markdown" ],
search = {
"search_type" : "all" ,
"included_sources" : [ "web" , "academic" ],
"start_date" : "2024-01-01" ,
"end_date" : "2024-12-31"
},
metadata = { "project" : "Q4-2024" , "team" : "research" }
)
if batch.success:
print ( f "Batch created: { batch.batch_id } " )
else :
print ( f "Error: { batch.error } " )
Mode Values
The mode parameter accepts the following values:
"standard" (default): Standard research mode
"heavy" : Comprehensive research mode
"fast" : Fast research mode (faster completion)
The lite mode has been replaced by fast.
The output_formats parameter accepts:
"markdown" : Markdown text output (default)
"pdf" : PDF document output
"toon" : TOON format (requires JSON schema)
JSON Schema Object : Structured output matching the provided schema
Important Notes:
Cannot mix JSON schema with "markdown" or "pdf". Use one or the other.
"toon" format requires a JSON schema to be provided.
add_tasks()
Add tasks to an existing batch. Tasks inherit the batch’s default settings but can override them individually.
Parameters:
Parameter Type Description batch_idstrBatch ID to add tasks to tasksList[Union[BatchTaskInput, Dict]]List of task inputs
Task Input Structure:
Each task can be a dictionary or BatchTaskInput object with:
id (optional): User-provided task ID
query (required): Research query or task description
strategy (optional): Natural language research strategy
urls (optional): URLs to extract and analyze
metadata (optional): Custom metadata for this task
Returns: BatchAddTasksResponse
Example:
from valyu.types.deepresearch import BatchTaskInput
# Using dictionaries
tasks = [
{ "query" : "What are the latest trends in AI?" },
{ "query" : "Summarize recent developments in quantum computing" },
{ "query" : "What is the current state of renewable energy?" }
]
# Or using BatchTaskInput objects
tasks = [
BatchTaskInput(
id = "task-1" ,
query = "Analyze OpenAI's latest product launches" ,
strategy = "Focus on technical capabilities and market impact" ,
urls = [ "https://openai.com/blog" ],
),
BatchTaskInput(
id = "task-2" ,
query = "Analyze Anthropic's Claude AI capabilities" ,
strategy = "Focus on safety features and enterprise adoption"
)
]
response = client.batch.add_tasks(batch_id, tasks)
if response.success:
print ( f "Added { response.added } tasks" )
if response.tasks:
print ( f "Created tasks: { [t.deepresearch_id for t in response.tasks] } " )
if response.counts:
print ( f "Batch counts: { response.counts.total } total, { response.counts.completed } completed" )
status()
Get the current status of a batch, including task counts and cost information.
Parameters:
Parameter Type Description batch_idstrBatch ID to check
Returns: BatchStatusResponse
Response Structure:
{
"success" : True ,
"batch" : {
"batch_id" : "batch_123" ,
"name" : "Market Research" ,
"status" : "processing" ,
"mode" : "standard" ,
"output_formats" : [ "markdown" ],
"search_params" : {
"search_type" : "all"
},
"counts" : {
"total" : 10 ,
"queued" : 2 ,
"running" : 3 ,
"completed" : 4 ,
"failed" : 1 ,
"cancelled" : 0
},
"cost" : 0.22 ,
"created_at" : "2025-01-15T10:30:00.000Z" ,
"completed_at" : None
}
}
Example:
status = client.batch.status(batch_id)
if status.success and status.batch:
batch = status.batch
print ( f "Batch: { batch.batch_id } " )
print ( f "Status: { batch.status } " )
print ( f "Total tasks: { batch.counts.total } " )
print ( f "Completed: { batch.counts.completed } " )
print ( f "Running: { batch.counts.running } " )
list_tasks()
List all tasks in a batch with their individual statuses.
Parameters:
Parameter Type Description batch_idstrBatch ID to list tasks for
Returns: BatchTasksListResponse
Example:
response = client.batch.list_tasks(batch_id)
if response.success and response.tasks:
for task in response.tasks:
print ( f "Task ID: { task.task_id or task.deepresearch_id } " )
print ( f "Query: { task.query } " )
print ( f "Status: { task.status } " )
if response.pagination:
print ( f "Pagination: { response.pagination.count } items, has_more: { response.pagination.has_more } " )
cancel()
Cancel a batch and all its pending/running tasks.
Parameters:
Parameter Type Description batch_idstrBatch ID to cancel
Returns: BatchCancelResponse
Example:
response = client.batch.cancel(batch_id)
if response.success:
print ( f "Batch cancelled: { response.message } " )
list()
List all batches for your account.
Parameters:
Parameter Type Default Description limitint10Maximum number of batches to return (max: 100)
Returns: BatchListResponse
Example:
response = client.batch.list( limit = 20 )
if response.success and response.batches:
for batch in response.batches:
print ( f "Batch: { batch.batch_id } " )
print ( f "Name: { batch.name or 'Unnamed' } " )
print ( f "Status: { batch.status } " )
print ( f "Tasks: { batch.counts.total } total, { batch.counts.completed } completed" )
wait_for_completion()
Wait for a batch to complete with automatic polling. This method blocks until the batch reaches a terminal state.
Parameters:
Parameter Type Default Description batch_idstrrequired Batch ID to wait for poll_intervalint10Seconds between polls max_wait_timeint14400Maximum wait time in seconds (default: 4 hours) on_progressCallable[[BatchStatusResponse], None]NoneCallback for progress updates
Returns: BatchStatusResponse (final status)
Raises:
TimeoutError: If max_wait_time is exceeded
ValueError: If batch fails or is cancelled
Example:
def on_progress ( status ):
if status.success and status.batch:
counts = status.batch.counts
print (
f "Progress: { counts.completed + counts.failed + counts.cancelled } / { counts.total } "
f "(Running: { counts.running } , Queued: { counts.queued } )"
)
try :
final_status = client.batch.wait_for_completion(
batch_id,
poll_interval = 10 ,
max_wait_time = 3600 , # 1 hour
on_progress = on_progress
)
if final_status.success and final_status.batch:
print ( f "Batch completed!" )
print ( f "Final status: { final_status.batch.status } " )
except TimeoutError as e:
print ( f "Timeout: { e } " )
except ValueError as e:
print ( f "Error: { e } " )
create_and_run()
Convenience method to create a batch and add tasks in one call. Optionally waits for completion.
Parameters:
All parameters from create() plus:
Parameter Type Default Description tasksList[Union[BatchTaskInput, Dict]]required List of task inputs waitboolFalseIf True, wait for batch to complete before returning poll_intervalint10Seconds between polls when waiting max_wait_timeint14400Maximum wait time in seconds on_progressCallable[[BatchStatusResponse], None]NoneCallback for progress updates
Returns: BatchCreateResponse
Example:
tasks = [
{ "query" : "What is the latest in generative AI?" },
{ "query" : "Summarize recent ML frameworks" },
{ "query" : "What are the top AI startups in 2024?" }
]
# Create and add tasks (don't wait)
batch = client.batch.create_and_run(
tasks = tasks,
name = "Quick Research Batch" ,
mode = "standard" ,
wait = False
)
# Or create, add tasks, and wait for completion
batch = client.batch.create_and_run(
tasks = tasks,
name = "Quick Research Batch" ,
mode = "standard" ,
wait = True ,
poll_interval = 10 ,
max_wait_time = 3600 ,
on_progress = on_progress
)
Search Configuration
The search parameter controls which data sources are queried. See the Batch Processing Guide for complete documentation of all search options.
from valyu.types.deepresearch import SearchConfig
# Using SearchConfig (recommended)
search_config = SearchConfig(
search_type = "all" ,
included_sources = [ "academic" , "web" ],
start_date = "2024-01-01" ,
end_date = "2024-12-31"
)
batch = client.batch.create(
name = "Academic Research" ,
mode = "standard" ,
search = search_config
)
# Or using a dictionary
batch = client.batch.create(
name = "Research Batch" ,
search = {
"search_type" : "proprietary" ,
"included_sources" : [ "academic" , "finance" ],
"start_date" : "2024-01-01"
}
)
Batch-level search parameters are inherited by all tasks and cannot be overridden per-task.
Complete Workflow Example
Here’s a complete example showing the typical batch workflow:
from valyu import Valyu
from valyu.types.deepresearch import BatchTaskInput, SearchConfig
import os
client = Valyu( api_key = os.getenv( "VALYU_API_KEY" ))
# 1. Create a batch with default settings
search_config = SearchConfig(
search_type = "all" ,
included_sources = [ "web" , "academic" ],
start_date = "2024-01-01"
)
batch = client.batch.create(
name = "Market Research Q4 2024" ,
mode = "standard" ,
output_formats = [ "markdown" ],
search = search_config,
metadata = { "project" : "Q4-2024" , "team" : "research" }
)
if not batch.success:
print ( f "Error creating batch: { batch.error } " )
exit ( 1 )
batch_id = batch.batch_id
print ( f "Created batch: { batch_id } " )
# 2. Add tasks to the batch
tasks = [
BatchTaskInput(
id = "competitor-1" ,
query = "Analyze OpenAI's latest product launches" ,
strategy = "Focus on technical capabilities and market impact" ,
urls = [ "https://openai.com/blog" ],
),
BatchTaskInput(
id = "competitor-2" ,
query = "Analyze Anthropic's Claude AI capabilities" ,
strategy = "Focus on safety features and enterprise adoption" ,
),
BatchTaskInput(
id = "trends-1" ,
query = "What are the latest trends in AI?" ,
)
]
add_response = client.batch.add_tasks(batch_id, tasks)
if not add_response.success:
print ( f "Error adding tasks: { add_response.error } " )
exit ( 1 )
print ( f "Added { add_response.added } tasks" )
# 3. Monitor progress
def on_progress ( status ):
if status.success and status.batch:
counts = status.batch.counts
print (
f "Progress: { counts.completed + counts.failed } / { counts.total } "
f "(Running: { counts.running } , Queued: { counts.queued } )"
)
# 4. Wait for completion
try :
final_status = client.batch.wait_for_completion(
batch_id,
poll_interval = 10 ,
max_wait_time = 3600 ,
on_progress = on_progress
)
if final_status.success and final_status.batch:
batch_info = final_status.batch
print ( f " \n Batch completed!" )
print ( f "Status: { batch_info.status } " )
print ( f "Total tasks: { batch_info.counts.total } " )
print ( f "Completed: { batch_info.counts.completed } " )
print ( f "Failed: { batch_info.counts.failed } " )
# 5. List all tasks to see results
tasks_response = client.batch.list_tasks(batch_id)
if tasks_response.success and tasks_response.tasks:
print ( " \n Task Details:" )
for task in tasks_response.tasks:
print ( f " - { task.task_id or task.deepresearch_id } " )
print ( f " Query: { task.query } " )
print ( f " Status: { task.status } " )
if tasks_response.pagination:
print ( f " \n Pagination: { tasks_response.pagination.count } items" )
if tasks_response.pagination.has_more:
print ( f " More items available (last_key: { tasks_response.pagination.last_key } )" )
except TimeoutError as e:
print ( f "Timeout: { e } " )
except ValueError as e:
print ( f "Error: { e } " )
Response Types
{
"success" : bool ,
"batch_id" : Optional[ str ],
"name" : Optional[ str ],
"status" : Optional[BatchStatus],
"mode" : Optional[DeepResearchMode],
"output_formats" : Optional[List[ ... ]],
"search_params" : Optional[Dict[ str , Any]],
"counts" : Optional[BatchCounts],
"cost" : Optional[ float ],
"created_at" : Optional[ str ],
"completed_at" : Optional[ str ],
"webhook_secret" : Optional[ str ], # Only on creation
"error" : Optional[ str ]
}
{
"success" : bool ,
"batch_id" : Optional[ str ],
"added" : Optional[ int ],
"tasks" : Optional[List[BatchTaskCreated]],
"counts" : Optional[BatchCounts],
"error" : Optional[ str ]
}
# BatchTaskCreated
{
"task_id" : Optional[ str ],
"deepresearch_id" : str ,
"status" : str
}
{
"success" : bool ,
"batch" : Optional[DeepResearchBatch],
"error" : Optional[ str ]
}
{
"success" : bool ,
"batch_id" : Optional[ str ],
"tasks" : Optional[List[BatchTaskListItem]],
"pagination" : Optional[BatchPagination],
"error" : Optional[ str ]
}
# BatchTaskListItem
{
"task_id" : Optional[ str ],
"deepresearch_id" : str ,
"query" : str ,
"status" : DeepResearchStatus,
"created_at" : Union[ int , str ],
"completed_at" : Optional[Union[ int , str ]]
}
# BatchPagination
{
"count" : int ,
"last_key" : Optional[ str ],
"has_more" : bool
}
{
"success" : bool ,
"batches" : Optional[List[DeepResearchBatch]],
"error" : Optional[ str ]
}
Best Practices
Use descriptive batch names : Make it easy to identify batches later
Set appropriate defaults : Configure batch-level settings that apply to most tasks
Monitor progress : Use wait_for_completion() with progress callbacks for long-running batches
Handle errors gracefully : Check success fields and handle errors appropriately
Use metadata : Add metadata to batches and tasks for easier filtering and organization
Set webhooks : Use webhook URLs for async completion notifications instead of polling
Task organization : Use custom task IDs to map results back to your system
Error Handling
All batch methods return response objects with a success field. Always check this before proceeding:
response = client.batch.create( ... )
if not response.success:
print ( f "Error: { response.error } " )
# Handle error appropriately
return
# Proceed with successful response
batch_id = response.batch_id
Webhooks
You can configure a webhook URL when creating a batch to receive notifications when the batch completes:
batch = client.batch.create(
name = "Research Batch" ,
mode = "standard" ,
webhook_url = "https://your-domain.com/webhook"
)
The webhook will receive a POST request when the batch reaches a terminal state (completed, completed_with_errors, or cancelled).
Limitations
Batch tasks do not support: files, deliverables, mcp_servers, brand_collection_id, or previous_reports. Use client.deepresearch.create() for these features.
Constraint Value Maximum tasks per request 100 Minimum tasks per request 1 Batch status to add tasks open or processing
Inherited settings (cannot override per-task): mode, output_formats, search_params
Per-task overrides allowed : strategy, urls, metadata
See the Batch Processing Guide for complete details.
See Also