Data Source API Reference¶
Complete API reference for data sources and the BaseDataSource class.
๐ค AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
Base Data Source Class¶
Class Attributes¶
| Attribute | Type | Required | Description |
|---|---|---|---|
config_class |
Type[attrs.define] |
Yes | Configuration attrs class (inputs) |
state_class |
Type[attrs.define] |
Yes | State attrs class (outputs) |
Required Methods¶
read()¶
async def read(self, ctx: ResourceContext) -> StateType | None:
"""Execute query and return data."""
Purpose: Fetch data based on user configuration.
Parameters:
- ctx: ResourceContext containing configuration and context
Returns:
- StateType | None: Query results, or None if config unavailable
When Called: During every terraform plan and terraform apply
Example:
async def read(self, ctx: ResourceContext) -> ServerQueryData | None:
if not ctx.config:
return None
servers = await api.list_servers(
filter=ctx.config.filter,
limit=ctx.config.limit,
)
return ServerQueryData(
id=f"{ctx.config.filter}:{ctx.config.limit}",
servers=servers,
count=len(servers),
)
Important:
- Data sources use the same ResourceContext API as resources
- Access configuration via ctx.config
- Return None if config is unavailable
- Unlike resources, data sources re-fetch data on every Terraform operation
get_schema()¶
Purpose: Define input parameters and output attributes.
Returns: PvsSchema object
Example:
from pyvider.schema import s_data_source, a_str, a_num, a_list
@classmethod
def get_schema(cls) -> PvsSchema:
return s_data_source({
# Inputs (from user)
"filter": a_str(required=True, description="Filter expression"),
"limit": a_num(default=10, description="Max results"),
# Outputs (computed by data source)
"id": a_str(computed=True, description="Query ID"),
"servers": a_list(a_str(), computed=True, description="Server list"),
"count": a_num(computed=True, description="Number of servers"),
})
Important: All outputs MUST have computed=True.
Type Signatures¶
Configuration Class¶
Input parameters from user:
import attrs
@attrs.define
class ServerQueryConfig:
"""User-provided query parameters."""
filter: str
limit: int = 10
region: str | None = None
Rules:
- All fields represent Terraform inputs
- Use defaults for optional parameters
- Use str | None for nullable inputs
Data Class¶
Query results returned to user:
import attrs
@attrs.define
class ServerQueryData:
"""Query results."""
id: str # Required: Stable query ID
servers: list[dict] # Query results
count: int # Result metadata
next_page: str | None # Optional: Pagination token
Rules:
- id field is required (must be deterministic)
- All fields are outputs (returned to Terraform)
- Use appropriate types (list, dict, str, int, bool)
Schema Attributes¶
Input Attributes¶
"filter": a_str(required=True, description="Filter expression")
"limit": a_num(default=10, description="Max results")
"region": a_str(description="AWS region") # Optional (no default)
Modifiers:
- required=True - User must provide value
- default=value - Optional with default
- No required or default - Optional (null allowed)
Output Attributes¶
"id": a_str(computed=True, description="Query ID")
"results": a_list(a_map(a_str()), computed=True, description="Results")
"count": a_num(computed=True, description="Result count")
"error": a_str(computed=True, description="Error message if query failed")
Rules:
- ALL outputs must have computed=True
- Cannot have required or default (they're outputs)
- Use descriptive descriptions
ID Generation¶
Data source IDs must be deterministic (same inputs = same ID):
Good: Deterministic¶
async def read(self, config: Config) -> Data:
# Generate stable ID from inputs
query_id = f"{config.endpoint}:{config.filter}:{config.limit}"
results = await api.query(...)
return Data(
id=query_id, # Same config always = same ID
results=results,
)
Bad: Non-Deterministic¶
import uuid
async def read(self, config: Config) -> Data:
return Data(
id=str(uuid.uuid4()), # Different ID every time!
results=results,
)
Hash-Based IDs¶
For complex configurations:
import hashlib
import json
async def read(self, config: Config) -> Data:
# Create hash of all config values
config_str = json.dumps({
"endpoint": config.endpoint,
"filter": config.filter,
"limit": config.limit,
}, sort_keys=True)
query_id = hashlib.md5(config_str.encode()).hexdigest()
return Data(id=query_id, ...)
Error Handling¶
Return Data With Error Field¶
@attrs.define
class QueryData:
id: str
results: list[dict]
error: str | None = None # Error field
async def read(self, config: Config) -> QueryData:
try:
results = await api.query(config.endpoint)
return QueryData(
id=config.endpoint,
results=results,
error=None,
)
except APIError as e:
return QueryData(
id=config.endpoint,
results=[],
error=str(e),
)
Don't Raise Exceptions¶
# Good: Return data with error
async def read(self, config: Config) -> Data:
try:
return await query(config)
except Exception as e:
return Data(id=id, results=[], error=str(e))
# Bad: Raise exception
async def read(self, config: Config) -> Data:
result = await api.query() # Might raise!
return Data(id=id, results=result)
Common Patterns¶
Handle Missing Data¶
async def read(self, config: Config) -> Data:
result = await api.get(config.resource_id)
if not result:
# Return empty data instead of None
return Data(
id=config.resource_id,
found=False,
value=None,
)
return Data(
id=result["id"],
found=True,
value=result["value"],
)
Boolean Outputs¶
@attrs.define
class FileInfoData:
id: str
exists: bool # Use bool for yes/no outputs
readable: bool
size: int | None # None if doesn't exist
async def read(self, config: Config) -> FileInfoData:
from pathlib import Path
path = Path(config.path)
return FileInfoData(
id=str(path.absolute()),
exists=path.exists(),
readable=path.exists() and os.access(path, os.R_OK),
size=path.stat().st_size if path.exists() else None,
)
List Outputs¶
from pyvider.schema import s_data_source, a_list, a_map, a_str
@classmethod
def get_schema(cls) -> PvsSchema:
return s_data_source({
"query": a_str(required=True),
"id": a_str(computed=True),
# List of strings
"tags": a_list(a_str(), computed=True),
# List of objects
"servers": a_list(
a_map(a_str()), # Each server is a map
computed=True
),
})
Caching¶
Cache expensive queries:
from functools import lru_cache
class ServerQuery(BaseDataSource):
@lru_cache(maxsize=128)
async def _fetch_servers(self, filter_str: str, limit: int) -> list[dict]:
"""Cached query."""
return await api.list_servers(filter=filter_str, limit=limit)
async def read(self, config: Config) -> Data:
# Query is cached by filter+limit
servers = await self._fetch_servers(config.filter, config.limit)
return Data(
id=f"{config.filter}:{config.limit}",
servers=servers,
count=len(servers),
)
Validation¶
Data sources don't have _validate_config(). Validate in read():
async def read(self, config: Config) -> Data:
# Validate inputs
errors = []
if not config.endpoint.startswith("/"):
errors.append("Endpoint must start with /")
if config.limit < 1 or config.limit > 1000:
errors.append("Limit must be between 1 and 1000")
if errors:
return Data(
id="error",
results=[],
error="; ".join(errors),
)
# Proceed with query
...
Performance Considerations¶
Idempotency¶
Reads should be idempotent (same result every time for same inputs):
# Good: Idempotent
async def read(self, config: Config) -> Data:
# Same query always returns same results
servers = await api.list_servers(filter=config.filter)
return Data(id=config.filter, servers=servers)
# Bad: Not idempotent
async def read(self, config: Config) -> Data:
# Returns different results each time!
servers = await api.list_recent_servers()
return Data(id=str(uuid.uuid4()), servers=servers)
Minimize API Calls¶
# Good: Single API call
async def read(self, config: Config) -> Data:
response = await api.get_user_with_posts(config.user_id)
return Data(
id=config.user_id,
user=response["user"],
posts=response["posts"],
)
# Bad: Multiple API calls
async def read(self, config: Config) -> Data:
user = await api.get_user(config.user_id)
posts = await api.get_posts(config.user_id) # Separate call
return Data(id=config.user_id, user=user, posts=posts)
Testing¶
import pytest
from my_provider.data_sources.server_query import ServerQuery, ServerQueryConfig
@pytest.mark.asyncio
async def test_server_query():
ds = ServerQuery()
config = ServerQueryConfig(filter="status=running", limit=5)
data = await ds.read(config)
assert data.count <= 5
assert len(data.servers) == data.count
assert data.id == "status=running:5" # Deterministic ID
@pytest.mark.asyncio
async def test_server_query_error_handling():
ds = ServerQuery()
config = ServerQueryConfig(filter="invalid!", limit=5)
data = await ds.read(config)
# Should return data with error, not raise
assert data.error is not None
assert data.servers == []
Complete Example¶
import attrs
import httpx
from pyvider.data_sources import register_data_source, BaseDataSource
from pyvider.resources.context import ResourceContext
from pyvider.schema import s_data_source, a_str, a_num, a_list, a_map, PvsSchema
@attrs.define
class ServerQueryConfig:
region: str
limit: int = 10
@attrs.define
class ServerQueryData:
id: str
servers: list[dict[str, str]]
count: int
@register_data_source("servers")
class ServerQuery(BaseDataSource):
config_class = ServerQueryConfig
state_class = ServerQueryData
@classmethod
def get_schema(cls) -> PvsSchema:
return s_data_source({
# Inputs
"region": a_str(required=True, description="AWS region"),
"limit": a_num(default=10, description="Max servers"),
# Outputs
"id": a_str(computed=True, description="Query ID"),
"servers": a_list(
a_map(a_str()),
computed=True,
description="Server list"
),
"count": a_num(computed=True, description="Server count"),
})
async def read(self, ctx: ResourceContext) -> ServerQueryData | None:
if not ctx.config:
return None
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.example.com/servers",
params={"region": ctx.config.region, "limit": ctx.config.limit}
)
servers = response.json()["servers"]
return ServerQueryData(
id=f"{ctx.config.region}:{ctx.config.limit}",
servers=servers,
count=len(servers),
)
See Also¶
- Create a Data Source - How-to guide
- Building Your First Data Source - Tutorial
- Handle Pagination - Pagination patterns
- API Reference - Auto-generated API docs