The Batch API allows you to run multiple deep research tasks in parallel, efficiently managing bulk research operations with shared configuration and progress monitoring.
Overview
The Batch API is designed for scenarios where you need to process many research queries simultaneously. Instead of creating individual deep research tasks, you can:
- Create a batch with shared settings (mode, search configuration, output formats)
- Add multiple tasks to the batch
- Monitor progress across all tasks
- Receive webhook notifications when batches complete
Key Concepts
Batch Lifecycle
A batch goes through the following states:
open: Batch is created and ready to accept tasks
processing: Batch has tasks that are queued or running
completed: All tasks finished successfully
completed_with_errors: Batch finished but some tasks failed
cancelled: Batch was cancelled before completion
Task States
Individual tasks within a batch can be in these states:
queued: Task is waiting to start
running: Task is currently executing
completed: Task finished successfully
failed: Task encountered an error
cancelled: Task was cancelled
Initialization
The Batch API is accessed through the batch property of your Valyu client:
from valyu import Valyu
client = Valyu(api_key="your-api-key")
batch_client = client.batch
Methods
create()
Create a new batch with default settings that will apply to all tasks.
Parameters:
| Parameter | Type | Default | Description |
|---|
name | str | None | Optional name for the batch |
mode | Literal["standard", "heavy", "fast"] | "standard" | Research mode (preferred): “standard” (default), “heavy” (comprehensive), or “fast” (faster completion). The lite mode has been replaced by fast. |
output_formats | List[Union[Literal["markdown", "pdf", "toon"], Dict]] | None | Output formats: [“markdown”], [“pdf”], [“toon”], or a JSON schema object. Cannot mix JSON schema with “markdown”/“pdf”. “toon” requires a JSON schema. |
search | Union[SearchConfig, Dict] | None | Search configuration (type, sources, dates, category). See Search Configuration section for details. |
webhook_url | str | None | HTTPS webhook URL for completion notification |
metadata | Dict[str, Union[str, int, bool]] | None | Custom metadata (key-value pairs) |
Returns: BatchCreateResponse
Example:
batch = client.batch.create(
name="Market Research Q4 2024",
mode="standard",
output_formats=["markdown"],
search={
"search_type": "all",
"included_sources": ["web", "academic"],
"start_date": "2024-01-01",
"end_date": "2024-12-31"
},
metadata={"project": "Q4-2024", "team": "research"}
)
if batch.success:
print(f"Batch created: {batch.batch_id}")
else:
print(f"Error: {batch.error}")
Mode Values
The mode parameter accepts the following values:
"standard" (default): Standard research mode
"heavy": Comprehensive research mode
"fast": Fast research mode (faster completion)
The lite mode has been replaced by fast.
The output_formats parameter accepts:
"markdown": Markdown text output (default)
"pdf": PDF document output
"toon": TOON format (requires JSON schema)
- JSON Schema Object: Structured output matching the provided schema
Important Notes:
- Cannot mix JSON schema with
"markdown" or "pdf". Use one or the other.
"toon" format requires a JSON schema to be provided.
add_tasks()
Add tasks to an existing batch. Tasks inherit the batch’s default settings but can override them individually.
Parameters:
| Parameter | Type | Description |
|---|
batch_id | str | Batch ID to add tasks to |
tasks | List[Union[BatchTaskInput, Dict]] | List of task inputs |
Task Input Structure:
Each task can be a dictionary or BatchTaskInput object with:
id (optional): User-provided task ID
query (required): Research query or task description
strategy (optional): Natural language research strategy
urls (optional): URLs to extract and analyze
metadata (optional): Custom metadata for this task
Returns: BatchAddTasksResponse
Example:
from valyu.types.deepresearch import BatchTaskInput
# Using dictionaries
tasks = [
{"query": "What are the latest trends in AI?"},
{"query": "Summarize recent developments in quantum computing"},
{"query": "What is the current state of renewable energy?"}
]
# Or using BatchTaskInput objects
tasks = [
BatchTaskInput(
id="task-1",
query="Analyze OpenAI's latest product launches",
strategy="Focus on technical capabilities and market impact",
urls=["https://openai.com/blog"],
),
BatchTaskInput(
id="task-2",
query="Analyze Anthropic's Claude AI capabilities",
strategy="Focus on safety features and enterprise adoption"
)
]
response = client.batch.add_tasks(batch_id, tasks)
if response.success:
print(f"Added {response.added} tasks")
if response.tasks:
print(f"Created tasks: {[t.deepresearch_id for t in response.tasks]}")
if response.counts:
print(f"Batch counts: {response.counts.total} total, {response.counts.completed} completed")
status()
Get the current status of a batch, including task counts and cost information.
Parameters:
| Parameter | Type | Description |
|---|
batch_id | str | Batch ID to check |
Returns: BatchStatusResponse
Response Structure:
{
"success": True,
"batch": {
"batch_id": "batch_123",
"name": "Market Research",
"status": "processing",
"mode": "standard",
"output_formats": ["markdown"],
"search_params": {
"search_type": "all"
},
"counts": {
"total": 10,
"queued": 2,
"running": 3,
"completed": 4,
"failed": 1,
"cancelled": 0
},
"cost": 0.22,
"created_at": "2025-01-15T10:30:00.000Z",
"completed_at": None
}
}
Example:
status = client.batch.status(batch_id)
if status.success and status.batch:
batch = status.batch
print(f"Batch: {batch.batch_id}")
print(f"Status: {batch.status}")
print(f"Total tasks: {batch.counts.total}")
print(f"Completed: {batch.counts.completed}")
print(f"Running: {batch.counts.running}")
list_tasks()
List all tasks in a batch with their individual statuses.
Parameters:
| Parameter | Type | Description |
|---|
batch_id | str | Batch ID to list tasks for |
Returns: BatchTasksListResponse
Example:
response = client.batch.list_tasks(batch_id)
if response.success and response.tasks:
for task in response.tasks:
print(f"Task ID: {task.task_id or task.deepresearch_id}")
print(f"Query: {task.query}")
print(f"Status: {task.status}")
if response.pagination:
print(f"Pagination: {response.pagination.count} items, has_more: {response.pagination.has_more}")
cancel()
Cancel a batch and all its pending/running tasks.
Parameters:
| Parameter | Type | Description |
|---|
batch_id | str | Batch ID to cancel |
Returns: BatchCancelResponse
Example:
response = client.batch.cancel(batch_id)
if response.success:
print(f"Batch cancelled: {response.message}")
list()
List all batches for your account.
Parameters:
| Parameter | Type | Default | Description |
|---|
limit | int | 10 | Maximum number of batches to return (max: 100) |
Returns: BatchListResponse
Example:
response = client.batch.list(limit=20)
if response.success and response.batches:
for batch in response.batches:
print(f"Batch: {batch.batch_id}")
print(f"Name: {batch.name or 'Unnamed'}")
print(f"Status: {batch.status}")
print(f"Tasks: {batch.counts.total} total, {batch.counts.completed} completed")
wait_for_completion()
Wait for a batch to complete with automatic polling. This method blocks until the batch reaches a terminal state.
Parameters:
| Parameter | Type | Default | Description |
|---|
batch_id | str | required | Batch ID to wait for |
poll_interval | int | 10 | Seconds between polls |
max_wait_time | int | 14400 | Maximum wait time in seconds (default: 4 hours) |
on_progress | Callable[[BatchStatusResponse], None] | None | Callback for progress updates |
Returns: BatchStatusResponse (final status)
Raises:
TimeoutError: If max_wait_time is exceeded
ValueError: If batch fails or is cancelled
Example:
def on_progress(status):
if status.success and status.batch:
counts = status.batch.counts
print(
f"Progress: {counts.completed + counts.failed + counts.cancelled}/{counts.total} "
f"(Running: {counts.running}, Queued: {counts.queued})"
)
try:
final_status = client.batch.wait_for_completion(
batch_id,
poll_interval=10,
max_wait_time=3600, # 1 hour
on_progress=on_progress
)
if final_status.success and final_status.batch:
print(f"Batch completed!")
print(f"Final status: {final_status.batch.status}")
except TimeoutError as e:
print(f"Timeout: {e}")
except ValueError as e:
print(f"Error: {e}")
create_and_run()
Convenience method to create a batch and add tasks in one call. Optionally waits for completion.
Parameters:
All parameters from create() plus:
| Parameter | Type | Default | Description |
|---|
tasks | List[Union[BatchTaskInput, Dict]] | required | List of task inputs |
wait | bool | False | If True, wait for batch to complete before returning |
poll_interval | int | 10 | Seconds between polls when waiting |
max_wait_time | int | 14400 | Maximum wait time in seconds |
on_progress | Callable[[BatchStatusResponse], None] | None | Callback for progress updates |
Returns: BatchCreateResponse
Example:
tasks = [
{"query": "What is the latest in generative AI?"},
{"query": "Summarize recent ML frameworks"},
{"query": "What are the top AI startups in 2024?"}
]
# Create and add tasks (don't wait)
batch = client.batch.create_and_run(
tasks=tasks,
name="Quick Research Batch",
mode="standard",
wait=False
)
# Or create, add tasks, and wait for completion
batch = client.batch.create_and_run(
tasks=tasks,
name="Quick Research Batch",
mode="standard",
wait=True,
poll_interval=10,
max_wait_time=3600,
on_progress=on_progress
)
Search Configuration
Search parameters control which data sources are queried, what content is included/excluded, and how results are filtered by date or category. When set at the batch level, these parameters are applied to all tasks in the batch and cannot be overridden by individual tasks.
Using SearchConfig (Recommended)
from valyu.types.deepresearch import SearchConfig
search_config = SearchConfig(
search_type="all",
included_sources=["academic", "web"],
start_date="2024-01-01",
end_date="2024-12-31",
excluded_sources=["patent"]
)
batch = client.batch.create(
name="Academic Research Q4 2024",
mode="standard",
search=search_config
)
Using Dictionary
batch = client.batch.create(
name="Competitor Analysis",
mode="standard",
search={
"search_type": "all",
"included_sources": ["web", "finance"],
"start_date": "2024-01-01",
"end_date": "2024-12-31",
"excluded_sources": ["patent"]
}
)
Search Type
Controls which backend search systems are queried for all tasks in the batch:
"all" (default): Searches both web and proprietary data sources
"web": Searches only web sources (general web search, news, articles)
"proprietary": Searches only proprietary data sources (academic papers, finance data, patents, etc.)
When set at the batch level, this parameter cannot be overridden by individual tasks.
batch = client.batch.create(
name="Academic Research Batch",
search={"search_type": "proprietary"}
)
Included Sources
Restricts search to only the specified source types for all tasks in the batch. When specified, only these sources will be searched. Tasks inherit this setting and cannot override it.
Available source types:
"web": General web search results (news, articles, websites)
"academic": Academic papers and research databases (ArXiv, PubMed, BioRxiv/MedRxiv, Clinical trials, FDA drug labels, WHO health data, NIH grants, Wikipedia)
"finance": Financial and economic data (Stock/crypto/FX prices, SEC filings, Company financial statements, Economic indicators, Prediction markets)
"patent": Patent and intellectual property data (USPTO patent database, Patent abstracts, claims, descriptions)
"transportation": Transit and transportation data (UK National Rail schedules, Maritime vessel tracking)
"politics": Government and parliamentary data (UK Parliament members, bills, votes)
"legal": Case law and legal data (UK court judgments, Legislation text)
batch = client.batch.create(
name="Academic Research Batch",
search={
"search_type": "proprietary",
"included_sources": ["academic", "web"]
}
)
Excluded Sources
Excludes specific source types from search results for all tasks in the batch. Uses the same source type values as included_sources. Cannot be used simultaneously with included_sources (use one or the other).
batch = client.batch.create(
name="Research Batch",
search={
"search_type": "proprietary",
"excluded_sources": ["web", "patent"]
}
)
Start Date
Format: ISO date format (YYYY-MM-DD)
Filters search results to only include content published or dated on or after this date for all tasks in the batch. Applied to both publication dates and event dates when available. Works across all source types.
batch = client.batch.create(
name="2024 Research",
search={"start_date": "2024-01-01"}
)
End Date
Format: ISO date format (YYYY-MM-DD)
Filters search results to only include content published or dated on or before this date for all tasks in the batch. Applied to both publication dates and event dates when available. Works across all source types.
batch = client.batch.create(
name="Q4 2024 Analysis",
search={
"start_date": "2024-10-01",
"end_date": "2024-12-31"
}
)
Category
Filters results by a specific category for all tasks in the batch. The exact categories available depend on the data source. Category values are source-dependent and may not be applicable to all source types.
batch = client.batch.create(
name="Technology Research",
search={"category": "technology"}
)
How Batch Search Parameters Work
- All tasks inherit batch search parameters: When you add tasks to a batch, they automatically inherit the batch’s search configuration
- Consistent search behavior: All tasks in the batch will use the same search configuration
- Cannot override per-task: Individual tasks in a batch cannot override the batch-level search parameters (they inherit them)
Important Notes
Parameter Enforcement
Batch-level parameters are enforced and cannot be overridden by individual tasks. This ensures consistent search behavior across all tasks in the batch. Tool-level source specifications are ignored if batch-level sources are specified.
Date Filtering
Dates are applied to both publication dates and event dates when available. ISO format (YYYY-MM-DD) is required. Date filtering works across all source types. If only start_date is provided, results include all content from that date forward. If only end_date is provided, results include all content up to that date. Both dates can be combined for a specific date range.
Complete Workflow Example
Here’s a complete example showing the typical batch workflow:
from valyu import Valyu
from valyu.types.deepresearch import BatchTaskInput, SearchConfig
import os
client = Valyu(api_key=os.getenv("VALYU_API_KEY"))
# 1. Create a batch with default settings
search_config = SearchConfig(
search_type="all",
included_sources=["web", "academic"],
start_date="2024-01-01"
)
batch = client.batch.create(
name="Market Research Q4 2024",
mode="standard",
output_formats=["markdown"],
search=search_config,
metadata={"project": "Q4-2024", "team": "research"}
)
if not batch.success:
print(f"Error creating batch: {batch.error}")
exit(1)
batch_id = batch.batch_id
print(f"Created batch: {batch_id}")
# 2. Add tasks to the batch
tasks = [
BatchTaskInput(
id="competitor-1",
query="Analyze OpenAI's latest product launches",
strategy="Focus on technical capabilities and market impact",
urls=["https://openai.com/blog"],
),
BatchTaskInput(
id="competitor-2",
query="Analyze Anthropic's Claude AI capabilities",
strategy="Focus on safety features and enterprise adoption",
),
BatchTaskInput(
id="trends-1",
query="What are the latest trends in AI?",
)
]
add_response = client.batch.add_tasks(batch_id, tasks)
if not add_response.success:
print(f"Error adding tasks: {add_response.error}")
exit(1)
print(f"Added {add_response.added} tasks")
# 3. Monitor progress
def on_progress(status):
if status.success and status.batch:
counts = status.batch.counts
print(
f"Progress: {counts.completed + counts.failed}/{counts.total} "
f"(Running: {counts.running}, Queued: {counts.queued})"
)
# 4. Wait for completion
try:
final_status = client.batch.wait_for_completion(
batch_id,
poll_interval=10,
max_wait_time=3600,
on_progress=on_progress
)
if final_status.success and final_status.batch:
batch_info = final_status.batch
print(f"\nBatch completed!")
print(f"Status: {batch_info.status}")
print(f"Total tasks: {batch_info.counts.total}")
print(f"Completed: {batch_info.counts.completed}")
print(f"Failed: {batch_info.counts.failed}")
# 5. List all tasks to see results
tasks_response = client.batch.list_tasks(batch_id)
if tasks_response.success and tasks_response.tasks:
print("\nTask Details:")
for task in tasks_response.tasks:
print(f" - {task.task_id or task.deepresearch_id}")
print(f" Query: {task.query}")
print(f" Status: {task.status}")
if tasks_response.pagination:
print(f"\nPagination: {tasks_response.pagination.count} items")
if tasks_response.pagination.has_more:
print(f" More items available (last_key: {tasks_response.pagination.last_key})")
except TimeoutError as e:
print(f"Timeout: {e}")
except ValueError as e:
print(f"Error: {e}")
Response Types
BatchCreateResponse
{
"success": bool,
"batch_id": Optional[str],
"name": Optional[str], # Batch name
"status": Optional[BatchStatus],
"mode": Optional[DeepResearchMode], # Research mode (renamed from 'model')
"output_formats": Optional[List[...]], # Output formats
"search_params": Optional[Dict[str, Any]], # Search parameters
"counts": Optional[BatchCounts],
"cost": Optional[float], # Total cost in dollars (replaces 'usage' object)
"created_at": Optional[str], # ISO 8601 timestamp string
"completed_at": Optional[str], # ISO 8601 timestamp string (if completed)
"webhook_secret": Optional[str], # Only returned on batch creation
"message": Optional[str],
"error": Optional[str]
}
BatchAddTasksResponse
{
"success": bool,
"batch_id": Optional[str],
"added": Optional[int], # Number of tasks successfully added
"tasks": Optional[List[BatchTaskCreated]], # Array of created task objects
"counts": Optional[BatchCounts], # Updated task counts for the batch
"message": Optional[str],
"error": Optional[str]
}
# BatchTaskCreated structure
{
"task_id": Optional[str], # User-provided task identifier (if specified)
"deepresearch_id": str, # DeepResearch task ID
"status": str # Task status
}
BatchStatusResponse
{
"success": bool,
"batch": Optional[DeepResearchBatch],
"error": Optional[str]
}
BatchTasksListResponse
{
"success": bool,
"batch_id": Optional[str],
"tasks": Optional[List[BatchTaskListItem]],
"pagination": Optional[BatchPagination], # Pagination information
"error": Optional[str]
}
# BatchTaskListItem structure
{
"task_id": Optional[str], # User-provided task identifier
"deepresearch_id": str, # DeepResearch task ID
"query": str, # The research query
"status": DeepResearchStatus, # Task status
"created_at": Union[int, str], # ISO 8601 timestamp string
"completed_at": Optional[Union[int, str]] # ISO 8601 timestamp string
}
# BatchPagination structure
{
"count": int, # Number of tasks returned in this response
"last_key": Optional[str], # Pagination key for fetching next page (if has_more is true)
"has_more": bool # Whether there are more tasks to fetch
}
BatchListResponse
{
"success": bool,
"batches": Optional[List[DeepResearchBatch]],
"error": Optional[str]
}
Best Practices
- Use descriptive batch names: Make it easy to identify batches later
- Set appropriate defaults: Configure batch-level settings that apply to most tasks
- Monitor progress: Use
wait_for_completion() with progress callbacks for long-running batches
- Handle errors gracefully: Check
success fields and handle errors appropriately
- Use metadata: Add metadata to batches and tasks for easier filtering and organization
- Set webhooks: Use webhook URLs for async completion notifications instead of polling
- Task organization: Use custom task IDs to map results back to your system
Error Handling
All batch methods return response objects with a success field. Always check this before proceeding:
response = client.batch.create(...)
if not response.success:
print(f"Error: {response.error}")
# Handle error appropriately
return
# Proceed with successful response
batch_id = response.batch_id
Webhooks
You can configure a webhook URL when creating a batch to receive notifications when the batch completes:
batch = client.batch.create(
name="Research Batch",
mode="standard",
webhook_url="https://your-domain.com/webhook"
)
The webhook will receive a POST request when the batch reaches a terminal state (completed, completed_with_errors, or cancelled).
Limitations
Not Yet Supported in Batch API
The following features are not yet supported in the batch API:
deliverables: Cannot specify deliverables (CSV, XLSX, PPTX, DOCX) for batch tasks
brand_collection_id: Cannot apply branding to batch tasks
files: Cannot attach files to batch tasks
mcp_servers: Cannot configure MCP servers for batch tasks
code_execution: Always enabled (cannot disable per batch)
previous_reports: Cannot reference previous reports in batch tasks
alert_email: Cannot set email alerts for batch tasks
Workaround: Use individual task creation (client.deepresearch.create()) if you need these features.
Task Constraints
- Maximum tasks per request: 100
- Minimum tasks per request: 1
- Batch status: Batch must be in
"open" or "processing" status to add tasks
- Batch ownership: You must own the batch (created with your API key)
Inherited Settings
Tasks automatically inherit from the batch:
mode - Research mode
output_formats - Output formats
search_params - Search parameters (search_type, included_sources, etc.)
Per-Task Overrides
Tasks can override some settings:
strategy - Custom research instructions (per-task)
urls - URLs to analyze (per-task)
metadata - Custom metadata (per-task)
Note: Tasks cannot override:
mode (inherited from batch)
output_formats (inherited from batch)
search_params (inherited from batch)
See Also