Skip to main content
The Batch API allows you to run multiple deep research tasks in parallel with shared configuration and progress monitoring.
For conceptual overview, batch lifecycle, and best practices, see the Batch Processing Guide. This page focuses on Python SDK method reference.

Quick Start

from valyu import Valyu

client = Valyu()

# Create batch, add tasks, wait for completion
batch = client.batch.create(name="Research Batch", mode="standard")
client.batch.add_tasks(batch.batch_id, [
    {"query": "Research AI trends"},
    {"query": "Analyze market data"}
])
result = client.batch.wait_for_completion(batch.batch_id)
print(f"Completed: {result.batch.counts.completed} tasks")

Initialization

The Batch API is accessed through the batch property of your Valyu client:
from valyu import Valyu

client = Valyu(api_key="your-api-key")
batch_client = client.batch

Methods

create()

Create a new batch with default settings that will apply to all tasks. Parameters:
ParameterTypeDefaultDescription
namestrNoneOptional name for the batch
modeLiteral["fast", "standard", "heavy", "max"]"standard"Research mode (preferred): “standard” (default), “heavy” (comprehensive), or “fast” (faster completion). The lite mode is deprecated and maps to standard.
output_formatsList[Union[Literal["markdown", "pdf", "toon"], Dict]]NoneOutput formats: [“markdown”], [“pdf”], [“toon”], or a JSON schema object. Cannot mix JSON schema with “markdown”/“pdf”. “toon” requires a JSON schema.
searchUnion[SearchConfig, Dict]NoneSearch configuration (type, sources, dates, category). See Search Configuration section for details.
webhook_urlstrNoneHTTPS webhook URL for completion notification
metadataDict[str, Union[str, int, bool]]NoneCustom metadata (key-value pairs)
Returns: BatchCreateResponse Example:
batch = client.batch.create(
    name="Market Research Q4 2024",
    mode="standard",
    output_formats=["markdown"],
    search={
        "search_type": "all",
        "included_sources": ["web", "academic"],
        "start_date": "2024-01-01",
        "end_date": "2024-12-31"
    },
    metadata={"project": "Q4-2024", "team": "research"}
)

if batch.success:
    print(f"Batch created: {batch.batch_id}")
else:
    print(f"Error: {batch.error}")

Mode Values

The mode parameter accepts the following values:
  • "standard" (default): Standard research mode
  • "heavy": Comprehensive research mode
  • "fast": Fast research mode (faster completion)
The lite mode is deprecated and maps to standard.

Output Formats

The output_formats parameter accepts:
  • "markdown": Markdown text output (default)
  • "pdf": PDF document output
  • "toon": TOON format (requires JSON schema)
  • JSON Schema Object: Structured output matching the provided schema
Important Notes:
  • Cannot mix JSON schema with "markdown" or "pdf". Use one or the other.
  • "toon" format requires a JSON schema to be provided.

add_tasks()

Add tasks to an existing batch. Tasks inherit the batch’s default settings but can override them individually. Parameters:
ParameterTypeDescription
batch_idstrBatch ID to add tasks to
tasksList[Union[BatchTaskInput, Dict]]List of task inputs
Task Input Structure: Each task can be a dictionary or BatchTaskInput object with:
  • id (optional): User-provided task ID
  • query (required): Research query or task description
  • strategy (optional): Natural language research strategy
  • urls (optional): URLs to extract and analyze
  • metadata (optional): Custom metadata for this task
Returns: BatchAddTasksResponse Example:
from valyu.types.deepresearch import BatchTaskInput

# Using dictionaries
tasks = [
    {"query": "What are the latest trends in AI?"},
    {"query": "Summarize recent developments in quantum computing"},
    {"query": "What is the current state of renewable energy?"}
]

# Or using BatchTaskInput objects
tasks = [
    BatchTaskInput(
        id="task-1",
        query="Analyze OpenAI's latest product launches",
        strategy="Focus on technical capabilities and market impact",
        urls=["https://openai.com/blog"],
    ),
    BatchTaskInput(
        id="task-2",
        query="Analyze Anthropic's Claude AI capabilities",
        strategy="Focus on safety features and enterprise adoption"
    )
]

response = client.batch.add_tasks(batch_id, tasks)

if response.success:
    print(f"Added {response.added} tasks")
    if response.tasks:
        print(f"Created tasks: {[t.deepresearch_id for t in response.tasks]}")
    if response.counts:
        print(f"Batch counts: {response.counts.total} total, {response.counts.completed} completed")

status()

Get the current status of a batch, including task counts and cost information. Parameters:
ParameterTypeDescription
batch_idstrBatch ID to check
Returns: BatchStatusResponse Response Structure:
{
    "success": True,
    "batch": {
        "batch_id": "batch_123",
        "name": "Market Research",
        "status": "processing",
        "mode": "standard",
        "output_formats": ["markdown"],
        "search_params": {
            "search_type": "all"
        },
        "counts": {
            "total": 10,
            "queued": 2,
            "running": 3,
            "completed": 4,
            "failed": 1,
            "cancelled": 0
        },
        "cost": 0.22,
        "created_at": "2025-01-15T10:30:00.000Z",
        "completed_at": None
    }
}
Example:
status = client.batch.status(batch_id)

if status.success and status.batch:
    batch = status.batch
    print(f"Batch: {batch.batch_id}")
    print(f"Status: {batch.status}")
    print(f"Total tasks: {batch.counts.total}")
    print(f"Completed: {batch.counts.completed}")
    print(f"Running: {batch.counts.running}")

list_tasks()

List all tasks in a batch with their individual statuses. Pass include_output=True to get full output, sources, images, and cost for each task. Parameters:
ParameterTypeDefaultDescription
batch_idstrrequiredBatch ID to list tasks for
statusstrNoneFilter by status: "completed", "failed", "cancelled", "running", "queued"
limitint25Results per page (max: 50)
last_keystrNonePagination cursor from previous response
include_outputboolFalseInclude full output, sources, images, and cost for each task
Returns: BatchTasksListResponse Example:
# Lightweight listing (status only)
response = client.batch.list_tasks(batch_id)

if response.success and response.tasks:
    for task in response.tasks:
        print(f"Task ID: {task.task_id or task.deepresearch_id}")
        print(f"Query: {task.query}")
        print(f"Status: {task.status}")

# Get full output for completed tasks
results = client.batch.list_tasks(batch_id, status="completed", include_output=True)

for task in results.tasks:
    print(f"Task: {task.task_id or task.deepresearch_id}")
    print(f"Query: {task.query}")
    print(f"Output: {task.output[:200]}...")
    print(f"Sources: {len(task.sources)} cited")
    print(f"Cost: ${task.cost}")

# Paginate through all results
last_key = results.pagination.last_key
while last_key:
    next_page = client.batch.list_tasks(batch_id, status="completed", include_output=True, last_key=last_key)
    for task in next_page.tasks:
        print(f"Task: {task.deepresearch_id} - {task.query}")
    last_key = next_page.pagination.last_key
By default, include_output is False, returning a lightweight listing with task status only. Set include_output=True when you need the full output, sources, images, and cost for each task.

cancel()

Cancel a batch and all its pending/running tasks. Parameters:
ParameterTypeDescription
batch_idstrBatch ID to cancel
Returns: BatchCancelResponse Example:
response = client.batch.cancel(batch_id)

if response.success:
    print(f"Batch cancelled: {response.message}")

list()

List all batches for your account. Parameters:
ParameterTypeDefaultDescription
limitint10Maximum number of batches to return (max: 100)
Returns: BatchListResponse Example:
response = client.batch.list(limit=20)

if response.success and response.batches:
    for batch in response.batches:
        print(f"Batch: {batch.batch_id}")
        print(f"Name: {batch.name or 'Unnamed'}")
        print(f"Status: {batch.status}")
        print(f"Tasks: {batch.counts.total} total, {batch.counts.completed} completed")

wait_for_completion()

Wait for a batch to complete with automatic polling. This method blocks until the batch reaches a terminal state. Parameters:
ParameterTypeDefaultDescription
batch_idstrrequiredBatch ID to wait for
poll_intervalint10Seconds between polls
max_wait_timeint14400Maximum wait time in seconds (default: 4 hours)
on_progressCallable[[BatchStatusResponse], None]NoneCallback for progress updates
Returns: BatchStatusResponse (final status) Raises:
  • TimeoutError: If max_wait_time is exceeded
  • ValueError: If batch fails or is cancelled
Example:
def on_progress(status):
    if status.success and status.batch:
        counts = status.batch.counts
        print(
            f"Progress: {counts.completed + counts.failed + counts.cancelled}/{counts.total} "
            f"(Running: {counts.running}, Queued: {counts.queued})"
        )

try:
    final_status = client.batch.wait_for_completion(
        batch_id,
        poll_interval=10,
        max_wait_time=3600,  # 1 hour
        on_progress=on_progress
    )

    if final_status.success and final_status.batch:
        print(f"Batch completed!")
        print(f"Final status: {final_status.batch.status}")

except TimeoutError as e:
    print(f"Timeout: {e}")
except ValueError as e:
    print(f"Error: {e}")

create_and_run()

Convenience method to create a batch and add tasks in one call. Optionally waits for completion. Parameters: All parameters from create() plus:
ParameterTypeDefaultDescription
tasksList[Union[BatchTaskInput, Dict]]requiredList of task inputs
waitboolFalseIf True, wait for batch to complete before returning
poll_intervalint10Seconds between polls when waiting
max_wait_timeint14400Maximum wait time in seconds
on_progressCallable[[BatchStatusResponse], None]NoneCallback for progress updates
Returns: BatchCreateResponse Example:
tasks = [
    {"query": "What is the latest in generative AI?"},
    {"query": "Summarize recent ML frameworks"},
    {"query": "What are the top AI startups in 2024?"}
]

# Create and add tasks (don't wait)
batch = client.batch.create_and_run(
    tasks=tasks,
    name="Quick Research Batch",
    mode="standard",
    wait=False
)

# Or create, add tasks, and wait for completion
batch = client.batch.create_and_run(
    tasks=tasks,
    name="Quick Research Batch",
    mode="standard",
    wait=True,
    poll_interval=10,
    max_wait_time=3600,
    on_progress=on_progress
)

Search Configuration

The search parameter controls which data sources are queried. See the Batch Processing Guide for complete documentation of all search options.
from valyu.types.deepresearch import SearchConfig

# Using SearchConfig (recommended)
search_config = SearchConfig(
    search_type="all",
    included_sources=["academic", "web"],
    start_date="2024-01-01",
    end_date="2024-12-31"
)

batch = client.batch.create(
    name="Academic Research",
    mode="standard",
    search=search_config
)

# Or using a dictionary
batch = client.batch.create(
    name="Research Batch",
    search={
        "search_type": "proprietary",
        "included_sources": ["academic", "finance"],
        "start_date": "2024-01-01"
    }
)
Batch-level search parameters are inherited by all tasks and cannot be overridden per-task.

Complete Workflow Example

Here’s a complete example showing the typical batch workflow:
from valyu import Valyu
from valyu.types.deepresearch import BatchTaskInput, SearchConfig
import os

client = Valyu(api_key=os.getenv("VALYU_API_KEY"))

# 1. Create a batch with default settings
search_config = SearchConfig(
    search_type="all",
    included_sources=["web", "academic"],
    start_date="2024-01-01"
)

batch = client.batch.create(
    name="Market Research Q4 2024",
    mode="standard",
    output_formats=["markdown"],
    search=search_config,
    metadata={"project": "Q4-2024", "team": "research"}
)

if not batch.success:
    print(f"Error creating batch: {batch.error}")
    exit(1)

batch_id = batch.batch_id
print(f"Created batch: {batch_id}")

# 2. Add tasks to the batch
tasks = [
    BatchTaskInput(
        id="competitor-1",
        query="Analyze OpenAI's latest product launches",
        strategy="Focus on technical capabilities and market impact",
        urls=["https://openai.com/blog"],
    ),
    BatchTaskInput(
        id="competitor-2",
        query="Analyze Anthropic's Claude AI capabilities",
        strategy="Focus on safety features and enterprise adoption",
    ),
    BatchTaskInput(
        id="trends-1",
        query="What are the latest trends in AI?",
    )
]

add_response = client.batch.add_tasks(batch_id, tasks)

if not add_response.success:
    print(f"Error adding tasks: {add_response.error}")
    exit(1)

print(f"Added {add_response.added} tasks")

# 3. Monitor progress
def on_progress(status):
    if status.success and status.batch:
        counts = status.batch.counts
        print(
            f"Progress: {counts.completed + counts.failed}/{counts.total} "
            f"(Running: {counts.running}, Queued: {counts.queued})"
        )

# 4. Wait for completion
try:
    final_status = client.batch.wait_for_completion(
        batch_id,
        poll_interval=10,
        max_wait_time=3600,
        on_progress=on_progress
    )

    if final_status.success and final_status.batch:
        batch_info = final_status.batch
        print(f"\nBatch completed!")
        print(f"Status: {batch_info.status}")
        print(f"Total tasks: {batch_info.counts.total}")
        print(f"Completed: {batch_info.counts.completed}")
        print(f"Failed: {batch_info.counts.failed}")

        # 5. Get all results with full output
        results = client.batch.list_tasks(batch_id, status="completed", include_output=True)
        if results.tasks:
            print("\nTask Results:")
            for task in results.tasks:
                print(f"  - {task.task_id or task.deepresearch_id}")
                print(f"    Query: {task.query}")
                print(f"    Output: {task.output[:200]}...")
                print(f"    Sources: {len(task.sources)} cited")
                print(f"    Cost: ${task.cost}")

except TimeoutError as e:
    print(f"Timeout: {e}")
except ValueError as e:
    print(f"Error: {e}")

Response Types

{
    "success": bool,
    "batch_id": Optional[str],
    "name": Optional[str],
    "status": Optional[BatchStatus],
    "mode": Optional[DeepResearchMode],
    "output_formats": Optional[List[...]],
    "search_params": Optional[Dict[str, Any]],
    "counts": Optional[BatchCounts],
    "cost": Optional[float],
    "created_at": Optional[str],
    "completed_at": Optional[str],
    "webhook_secret": Optional[str],  # Only on creation
    "error": Optional[str]
}
{
    "success": bool,
    "batch_id": Optional[str],
    "added": Optional[int],
    "tasks": Optional[List[BatchTaskCreated]],
    "counts": Optional[BatchCounts],
    "error": Optional[str]
}

# BatchTaskCreated
{
    "task_id": Optional[str],
    "deepresearch_id": str,
    "status": str
}
{
    "success": bool,
    "batch": Optional[DeepResearchBatch],
    "error": Optional[str]
}
{
    "success": bool,
    "batch_id": Optional[str],
    "tasks": Optional[List[BatchTaskListItem]],
    "pagination": Optional[BatchPagination],
    "error": Optional[str]
}

# BatchTaskListItem
{
    "task_id": Optional[str],
    "deepresearch_id": str,
    "query": str,
    "status": DeepResearchStatus,
    "created_at": Union[int, str],
    "completed_at": Optional[Union[int, str]],
    # Additional fields when include_output=True:
    "output_type": Optional[str],
    "output": Optional[str],
    "sources": Optional[List[Source]],
    "images": Optional[List[str]],
    "pdf_url": Optional[str],
    "deliverables": Optional[Any],
    "error": Optional[str],
    "cost": Optional[float]
}

# BatchPagination
{
    "count": int,
    "last_key": Optional[str],
    "has_more": bool
}
{
    "success": bool,
    "batches": Optional[List[DeepResearchBatch]],
    "error": Optional[str]
}

Best Practices

  1. Use descriptive batch names: Make it easy to identify batches later
  2. Set appropriate defaults: Configure batch-level settings that apply to most tasks
  3. Monitor progress: Use wait_for_completion() with progress callbacks for long-running batches
  4. Handle errors gracefully: Check success fields and handle errors appropriately
  5. Use metadata: Add metadata to batches and tasks for easier filtering and organization
  6. Set webhooks: Use webhook URLs for async completion notifications instead of polling
  7. Task organization: Use custom task IDs to map results back to your system

Error Handling

All batch methods return response objects with a success field. Always check this before proceeding:
response = client.batch.create(...)

if not response.success:
    print(f"Error: {response.error}")
    # Handle error appropriately
    return

# Proceed with successful response
batch_id = response.batch_id

Webhooks

You can configure a webhook URL when creating a batch to receive notifications when the batch completes:
batch = client.batch.create(
    name="Research Batch",
    mode="standard",
    webhook_url="https://your-domain.com/webhook"
)
The webhook will receive a POST request when the batch reaches a terminal state (completed, completed_with_errors, or cancelled).

Limitations

Batch tasks do not support: files, deliverables, mcp_servers, or previous_reports. Use client.deepresearch.create() for these features.
ConstraintValue
Maximum tasks per request100
Minimum tasks per request1
Batch status to add tasksopen or processing
Inherited settings (cannot override per-task): mode, output_formats, search_params Per-task overrides allowed: strategy, urls, metadata See the Batch Processing Guide for complete details.

See Also