DeepResearch is an asynchronous research API that performs comprehensive research by searching multiple sources, analyzing content, and generating detailed reports. Unlike synchronous APIs, DeepResearch runs tasks in the background, allowing for thorough multi-step research that can take minutes to complete.
When to Use DeepResearch
Use DeepResearch when you need:
In-depth analysis - Complex research across multiple sources
Report generation - Markdown or PDF output with citations
Structured data extraction - Research results in custom JSON formats
Background processing - Long-running research without blocking your application
For quick answers to simple questions, consider using the Answer API instead.
Features
Multi-Source Research Searches web, academic, and proprietary sources in a single task.
Research Modes Choose lite for quick research or heavy for complex analysis.
Multiple Outputs Get results as markdown, PDF, or structured JSON.
File Analysis Attach PDFs, images, and documents for analysis.
URL Extraction Include specific URLs to analyze as part of research.
Webhooks Get notified when research completes.
Quick Start
Create a Research Task
from valyu import Valyu
valyu = Valyu()
# Create a research task
task = valyu.deepresearch.create(
input = "What are the key differences between RAG and fine-tuning for LLMs?" ,
model = "lite"
)
print ( f "Task created: { task.deepresearch_id } " )
print ( f "Status: { task.status } " )
Wait for Completion
# Wait for the task to complete
result = valyu.deepresearch.wait(
task.deepresearch_id,
poll_interval = 5 , # Check every 5 seconds
max_wait_time = 900 # Timeout after 15 minutes (lite mode)
)
if result.status == "completed" :
print ( "Research completed!" )
print (result.output)
# Access sources used
for source in result.sources:
print ( f "- { source.title } : { source.url } " )
# Check costs
print ( f "Total cost: $ { result.usage.total_cost :.4f} " )
Task Statuses
When you create a task, it goes through the following statuses:
Status Description queuedTask is waiting to start due to rate limits or capacity runningTask is actively researching completedResearch finished successfully failedResearch failed (check error field) cancelledTask was cancelled by user
Queued Tasks
Tasks may be queued when:
Your organization has multiple concurrent tasks running
System capacity is temporarily limited
Queued tasks start automatically when capacity becomes available. No action is required.
task = valyu.deepresearch.create( input = "Research query" )
if task.status == "queued" :
print ( f "Task queued: { task.message } " )
# Task will start automatically - just wait for it
result = valyu.deepresearch.wait(task.deepresearch_id)
The wait method handles queued tasks automatically. It continues polling until the task completes, fails, or is cancelled.
Research Modes
Mode Use Case Typical Completion Time liteQuick research, straightforward questions, fact-checking 5-10 minutes heavyComplex analysis, multi-faceted topics, detailed reports 15-30 minutes
# Use heavy mode for complex research
task = valyu.deepresearch.create(
input = "Analyze the competitive landscape of the cloud computing market in 2024" ,
model = "heavy"
)
Markdown (Default)
task = valyu.deepresearch.create(
input = "Explain quantum computing advancements" ,
output_formats = [ "markdown" ]
)
Markdown + PDF
task = valyu.deepresearch.create(
input = "Write a report on renewable energy trends" ,
output_formats = [ "markdown" , "pdf" ]
)
# After completion, access the PDF URL
result = valyu.deepresearch.wait(task.deepresearch_id)
if result.pdf_url:
print ( f "PDF available at: { result.pdf_url } " )
Structured JSON
Get research results in a custom schema using JSON Schema specification:
task = valyu.deepresearch.create(
input = "Research competitor pricing in the SaaS market" ,
output_formats = [{
"type" : "object" ,
"properties" : {
"competitors" : {
"type" : "array" ,
"items" : {
"type" : "object" ,
"properties" : {
"name" : { "type" : "string" },
"pricing_model" : { "type" : "string" },
"price_range" : { "type" : "string" },
"key_features" : {
"type" : "array" ,
"items" : { "type" : "string" }
}
},
"required" : [ "name" , "pricing_model" ]
}
},
"market_summary" : { "type" : "string" },
"recommendations" : {
"type" : "array" ,
"items" : { "type" : "string" }
}
},
"required" : [ "competitors" , "market_summary" ]
}]
)
You cannot mix JSON Schema with markdown/pdf formats. Use one or the other.
The schema must be a valid JSON Schema . Use type, properties, required, items, and other standard JSON Schema keywords.
Search Configuration
Filter which sources the research uses:
task = valyu.deepresearch.create(
input = "Latest AI research in healthcare diagnostics" ,
model = "heavy" ,
search = {
"search_type" : "all" , # "all", "web", or "proprietary"
"included_sources" : [ "pubmed" , "arxiv" , "nature.com" ],
"start_date" : "2024-01-01" ,
"end_date" : "2025-01-01"
}
)
Search Types
Type Description allSearch web and proprietary sources (default) webWeb sources only proprietaryAcademic and premium sources only
File Attachments
Analyze documents as part of research:
import base64
# Read and encode a PDF
with open ( "report.pdf" , "rb" ) as f:
pdf_data = base64.b64encode(f.read()).decode()
task = valyu.deepresearch.create(
input = "Summarize the key findings from this report and compare with current market trends" ,
model = "heavy" ,
files = [{
"data" : f "data:application/pdf;base64, { pdf_data } " ,
"filename" : "report.pdf" ,
"mediaType" : "application/pdf" ,
"context" : "Q4 2024 financial report"
}]
)
Supported file types include PDFs, images (PNG, JPEG, WebP), and documents.
Include specific URLs to analyze:
task = valyu.deepresearch.create(
input = "Compare the approaches described in these articles" ,
urls = [
"https://example.com/article-1" ,
"https://example.com/article-2"
]
)
Task Management
Check Status
status = valyu.deepresearch.status(task_id)
print ( f "Status: { status.status } " )
if status.progress:
print ( f "Progress: { status.progress.current_step } / { status.progress.total_steps } " )
Add Follow-up Instructions
While a task is running, you can add instructions:
valyu.deepresearch.update(
task_id,
instruction = "Focus more on peer-reviewed sources"
)
Cancel a Task
valyu.deepresearch.cancel(task_id)
Delete a Task
valyu.deepresearch.delete(task_id)
List All Tasks
tasks = valyu.deepresearch.list( api_key_id = "your-api-key-id" , limit = 50 )
for task in tasks.data:
print ( f " { task[ 'query' ] } - { task[ 'status' ] } " )
Webhooks
Webhooks provide real-time notifications when a DeepResearch task completes or fails, eliminating the need for polling.
When to Use Webhooks
Approach Best For Webhooks Production systems, serverless architectures, event-driven workflows Polling Development, debugging, simple scripts, real-time progress tracking
Setting Up Webhooks
When you provide a webhook_url, the server generates a cryptographic secret for signature verification:
task = valyu.deepresearch.create(
input = "Research market trends" ,
webhook_url = "https://your-app.com/webhooks/deepresearch"
)
# IMPORTANT: Save the secret immediately - it's only returned once
webhook_secret = task.webhook_secret
print ( f "Store this secret securely: { webhook_secret } " )
The webhook_secret is only returned in the initial task creation response. Store it securely in your database or secrets manager—you cannot retrieve it later.
Webhook URLs must use HTTPS . HTTP URLs are rejected for security.
Webhook Payload
When the task completes or fails, your endpoint receives a POST request with the full task data:
{
"deepresearch_id" : "f992a8ab-4c91-4322-905f-190107bd5a5b" ,
"status" : "completed" ,
"mode" : "lite" ,
"query" : "Research market trends" ,
"output_formats" : [ "markdown" ],
"output" : "# Market Trends Analysis \n\n ## Overview..." ,
"pdf_url" : "https://storage.valyu.ai/reports/..." ,
"sources" : [
{
"title" : "Market Analysis Report 2024" ,
"url" : "https://example.com/report" ,
"snippet" : "Key findings indicate..." ,
"source" : "web" ,
"word_count" : 2500
}
],
"images" : [],
"usage" : {
"search_cost" : 0.0075 ,
"contents_cost" : 0 ,
"ai_cost" : 0.15 ,
"compute_cost" : 0 ,
"total_cost" : 0.1575
},
"error" : null ,
"created_at" : 1759617800000 ,
"updated_at" : 1759617836483 ,
"completed_at" : 1759617836483 ,
"search_params" : {},
"code_execution" : true ,
"current_step" : 5 ,
"total_steps" : 5
}
Each webhook request includes headers for verification:
Header Description X-Webhook-SignatureHMAC-SHA256 signature in format sha256=<hex_signature> X-Webhook-TimestampUnix timestamp (milliseconds) when the request was sent Content-Typeapplication/jsonUser-AgentValyu-DeepResearch/1.0
Verifying Webhook Signatures
Always verify the signature to ensure the webhook is authentic:
import hmac
import hashlib
def verify_webhook (
payload_body : str ,
signature_header : str ,
timestamp_header : str ,
secret : str
) -> bool :
"""Verify the webhook signature is valid."""
# Reconstruct the signed payload: timestamp.payload
signed_payload = f " { timestamp_header } . { payload_body } "
# Generate expected signature
expected_signature = "sha256=" + hmac.new(
secret.encode(),
signed_payload.encode(),
hashlib.sha256
).hexdigest()
# Use constant-time comparison to prevent timing attacks
return hmac.compare_digest(expected_signature, signature_header)
# Example Flask endpoint
from flask import Flask, request, jsonify
app = Flask( __name__ )
WEBHOOK_SECRET = "your-stored-secret" # Load from secure storage
@app.route ( "/webhooks/deepresearch" , methods = [ "POST" ])
def handle_webhook ():
signature = request.headers.get( "X-Webhook-Signature" )
timestamp = request.headers.get( "X-Webhook-Timestamp" )
payload = request.get_data( as_text = True )
if not verify_webhook(payload, signature, timestamp, WEBHOOK_SECRET ):
return jsonify({ "error" : "Invalid signature" }), 401
data = request.json
if data[ "status" ] == "completed" :
print ( f "Research completed: { data[ 'deepresearch_id' ] } " )
print ( f "Output: { data[ 'output' ][: 200 ] } ..." )
elif data[ "status" ] == "failed" :
print ( f "Research failed: { data[ 'error' ] } " )
return jsonify({ "received" : True }), 200
Retry Behavior
The webhook service automatically retries failed deliveries:
Property Value Maximum retries 5 attempts Timeout per request 15 seconds Backoff strategy Exponential: 1s → 2s → 4s → 8s → 16s 4xx errors No retry (client error) 5xx errors Will retry (server error)
Return a 2xx status code quickly to acknowledge receipt. Process the webhook payload asynchronously to avoid timeouts.
Webhook Events
Webhooks are triggered for:
Event When completedResearch finished successfully failedResearch encountered an error
Webhooks are not sent for cancelled tasks. If you need to track cancellations, use the status endpoint or list endpoint to check task states.
Complete Webhook Flow
1. POST /deepresearch with webhook_url
↓
2. API returns task with webhook_secret (store this!)
↓
3. Research executes asynchronously
↓
4. On completion/failure → POST to your webhook_url
• Headers: X-Webhook-Signature, X-Webhook-Timestamp
• Body: Full task payload (output, sources, usage, etc.)
↓
5. Your server verifies signature and processes result
↓
6. Return 2xx to acknowledge receipt
Progress Callbacks
Track progress in real-time:
def on_progress ( status ):
if status.progress:
print ( f "Step { status.progress.current_step } / { status.progress.total_steps } " )
print ( f "Status: { status.status } " )
result = valyu.deepresearch.wait(
task_id,
poll_interval = 5 ,
on_progress = on_progress
)
Response Structure
Completed Task
{
"deepresearch_id" : "f992a8ab-4c91-4322-905f-190107bd5a5b" ,
"status" : "completed" ,
"query" : "What are the key differences between RAG and fine-tuning?" ,
"mode" : "lite" ,
"output_type" : "markdown" ,
"output" : "# RAG vs Fine-tuning \n\n ## Overview..." ,
"sources" : [
{
"title" : "Understanding RAG Systems" ,
"url" : "https://example.com/rag-guide" ,
"snippet" : "Retrieval-Augmented Generation combines..." ,
"source" : "web" ,
"word_count" : 2500
}
],
"usage" : {
"search_cost" : 0.0075 ,
"contents_cost" : 0 ,
"ai_cost" : 0.15 ,
"compute_cost" : 0 ,
"total_cost" : 0.1575
},
"completed_at" : 1759617836483 ,
"pdf_url" : "https://s3.amazonaws.com/..."
}
Error Handling
task = valyu.deepresearch.create( input = "Research query" )
if not task.success:
print ( f "Failed to create task: { task.error } " )
else :
try :
result = valyu.deepresearch.wait(task.deepresearch_id)
except TimeoutError :
print ( "Task took too long" )
valyu.deepresearch.cancel(task.deepresearch_id)
except ValueError as e:
print ( f "Task failed: { e } " )
Best Practices
Polling Strategy
Lite mode : Poll every 5-10 seconds, timeout after 15 minutes
Heavy mode : Poll every 10-30 seconds, timeout after 45 minutes
Use webhooks for production to avoid polling overhead
Recommended Timeouts
Mode Recommended Timeout lite15 minutes (900 seconds) heavy45 minutes (2700 seconds)
Cost Optimization
Use lite mode for simple research
Filter sources with search config to focus on relevant content
Set start_date and end_date to limit scope
Research Quality
Provide clear, specific research queries
Use strategy to guide the research approach
Add context to file attachments
Limitations
Limit Value Maximum files per request 10 Maximum URLs per request 10 Maximum MCP servers 5 Maximum previous reports for context 3 Recommended file size < 10MB per file
Next Steps