Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.valyu.ai/llms.txt

Use this file to discover all available pages before exploring further.

The Valyu Python SDK provides comprehensive access to all Valyu APIs, enabling you to build powerful AI applications with search, content extraction, answer generation, and deep research capabilities.

Features

The SDK includes four core APIs:
  • Search API - Advanced search across web and proprietary data sources
  • Contents API - Extract and process content from URLs with AI
  • Answer API - AI-powered answer generation with search integration
  • DeepResearch API - Async deep research with comprehensive reports

Installation

Install the Valyu Python SDK using pip:
pip install valyu

Authentication

Get your API key from Valyu Platform (free $10 credits included). Set up authentication in one of two ways:
export VALYU_API_KEY="your-api-key-here"
from valyu import Valyu

# Automatically uses VALYU_API_KEY from environment
valyu = Valyu()

Direct API Key

from valyu import Valyu

valyu = Valyu("your-api-key-here")

Quick Start

Here’s a simple example to get you started with search:
from valyu import Valyu

valyu = Valyu()

# Basic search example
search_response = valyu.search(
    "What are the latest developments in quantum computing?"
)

print(f"Found {len(search_response.results)} results")
for result in search_response.results:
    print(f"Title: {result.title}")
    print(f"URL: {result.url}")
    print(f"Content preview: {result.content[:200]}...")

Error Handling

The SDK includes built-in error handling and validation:
response = valyu.search("test query")

if not response.success:
    print("Search failed:", response.error)
    return

# Process successful results
for result in response.results:
    # Handle each result
    pass

Async Usage

AsyncValyu is the async/await counterpart to Valyu. It’s built on httpx.AsyncClient and takes exactly the same arguments, returns the same response objects, and applies the same validation as the synchronous client. The only difference is that every method is a coroutine — you await it instead of calling it directly.
import asyncio
from valyu import AsyncValyu

async def main():
    async with AsyncValyu() as valyu:
        response = await valyu.search("quantum computing")
        for r in response.results:
            print(r.title, "—", r.url)

asyncio.run(main())
Scope a search to a specific data source the same way as sync:
response = await valyu.search(
    "Real options valuation binomial Monte Carlo natural resource R&D",
    included_sources=["wiley/wiley-finance-papers", "wiley/wiley-finance-books"],
)

When to use it

Reach for AsyncValyu when any of these apply. If none apply, the synchronous Valyu client is simpler and just as fast.
SituationWhy async helps
You’re running several Valyu calls per user request (an agent fanning out sub-queries, multi-source research, etc.)asyncio.gather runs them all in parallel instead of one after another.
You’re extracting many URLs with the Contents APIDispatch them concurrently instead of polling one URL at a time.
You’re building a web service (FastAPI, Starlette, aiohttp) that calls Valyu during request handlingAwait the call without tying up a worker thread; serve more concurrent users per process.
You’re already inside an async code base (LLM SDKs, async database drivers, queue consumers)Drop-in — no run_in_executor gymnastics.
You need to cap in-flight requests preciselyOne asyncio.Semaphore is all you need; no thread pool to tune.
Sticking with sync is the right call for one-off scripts, Jupyter notebooks doing a single query, or CLIs that fire one request and exit.

Patterns

Single await

async with AsyncValyu() as valyu:
    response = await valyu.search("quantum computing")
Every kwarg from the sync searchsearch_type, max_num_results, included_sources, start_date, end_date, category, country_code, relevance_threshold, source_biases, instructions, fast_mode, etc. — works identically on async:
response = await valyu.search(
    "climate tech funding",
    search_type="news",
    max_num_results=15,
    start_date="2024-01-01",
    country_code="US",
)

Fan-out — run many queries in parallel

Perfect for research agents or any place where you have a known, modest-size set of queries:
import asyncio
from valyu import AsyncValyu

queries = [
    "DCF terminal value assumptions",
    "Black-Scholes binomial tree derivation",
    "GARCH volatility forecasting",
    "ESG disclosure cost of capital",
]

async def main():
    async with AsyncValyu() as valyu:
        responses = await asyncio.gather(*[
            valyu.search(q, max_num_results=10) for q in queries
        ])
        for q, r in zip(queries, responses):
            print(f"{q}: {len(r.results)} results")

asyncio.run(main())
All four requests open simultaneously. Total wall time is the slowest of the four, not the sum.

Bounded fan-out — cap how many are in flight

When you have hundreds of queries but only want (for example) 20 hitting the API at once, use an asyncio.Semaphore:
import asyncio
from valyu import AsyncValyu

queries = [...]  # hundreds of queries

async def search_one(valyu, sem, query):
    async with sem:
        return await valyu.search(query, max_num_results=10)

async def main():
    async with AsyncValyu(max_connections=20) as valyu:
        sem = asyncio.Semaphore(20)
        responses = await asyncio.gather(*[
            search_one(valyu, sem, q) for q in queries
        ])

asyncio.run(main())
The semaphore gates coroutine entry; max_connections=20 sizes the underlying HTTP pool to match. Keep these two numbers aligned.

Stream results as they arrive

Use asyncio.as_completed when you want to process each result the moment it’s ready rather than waiting for the whole batch:
import asyncio
from valyu import AsyncValyu

queries = [...]

async def main():
    async with AsyncValyu() as valyu:
        tasks = [valyu.search(q, max_num_results=5) for q in queries]
        for coro in asyncio.as_completed(tasks):
            response = await coro
            print(response.query, len(response.results))

asyncio.run(main())

Inside a FastAPI endpoint

Instantiate the client once at startup and share it across requests. Don’t create a fresh AsyncValyu per request — you’d throw away the connection pool every time.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from valyu import AsyncValyu

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.valyu = AsyncValyu()
    try:
        yield
    finally:
        await app.state.valyu.aclose()

app = FastAPI(lifespan=lifespan)

@app.get("/search")
async def search(q: str):
    response = await app.state.valyu.search(q)
    return {"results": [r.model_dump() for r in response.results]}

Constructor options

ParameterTypeDescriptionDefault
api_keyOptional[str]API key. Falls back to the VALYU_API_KEY env var.None
base_urlstrBase URL of the Valyu API.https://api.valyu.ai/v1
max_connectionsintMaximum simultaneous HTTP connections the client pool will open. Raise when fan-out is high; lower on constrained environments.100
max_keepalive_connectionsOptional[int]Idle connections kept warm for reuse between requests.max(20, max_connections // 5)
timeoutfloatPer-request timeout in seconds. Applies to every outbound request.600.0
http_clientOptional[httpx.AsyncClient]Pre-configured client to use instead of the default. Ownership stays with the caller — aclose() won’t close it.None
A good starting point for max_connections is the number of concurrent requests you expect in flight at peak — match it to your semaphore or your expected user concurrency. Set it too low and requests queue behind each other inside httpx; set it too high and you keep more TCP sockets open than you need. 100 is comfortable for most workloads.

Lifecycle

Prefer async with so the connection pool is released deterministically:
async with AsyncValyu() as valyu:
    ...
For long-lived services, instantiate the client once and call aclose() during shutdown:
valyu = AsyncValyu()
try:
    ...
finally:
    await valyu.aclose()
aclose() is idempotent. Calling it more than once is safe.

Bring your own httpx client

Pass an existing httpx.AsyncClient to reuse proxies, custom TLS settings, or a shared connection pool across multiple libraries. When you inject a client, ownership stays with you — aclose() won’t close it:
import httpx
from valyu import AsyncValyu

http = httpx.AsyncClient(
    timeout=30.0,
    limits=httpx.Limits(max_connections=200),
)

valyu = AsyncValyu(http_client=http)
# ... use valyu ...
await valyu.aclose()  # does NOT close `http`
await http.aclose()   # you own this

Things to avoid

  • Creating a new AsyncValyu per request. You’ll tear down and rebuild the connection pool on every call — slower than sync. Hold one instance for the lifetime of your service or script.
  • Mixing asyncio.run inside an already-running event loop. If you’re inside a framework that manages its own loop (Jupyter, FastAPI, etc.), call the coroutine with await, not asyncio.run.
  • Leaving the client open at process exit. Always aclose() or use async with; otherwise httpx will warn about unclosed connections.
AsyncValyu currently exposes search, contents, get_contents_job, and wait_for_contents_job. The remaining endpoints (answer, deepresearch, datasources) are only available on the synchronous Valyu client today; await-style counterparts will follow.

Next Steps

Explore the detailed documentation for each API:

Search API

Advanced search across web and proprietary sources

Contents API

Extract and process web content with AI

Answer API

Generate AI-powered answers with search

DeepResearch API

Async deep research with comprehensive reports

Support