Python SDK (`rivellum-ai`)
The official Python async client for the Rivellum network. Provides two classes:
RivellumClient— full blockchain query and submission clientRivellumAgent— AI/agent-centric helper (payments, escrow, marketplace, streams)
Installation
pip install rivellum-ai
# or
uv add rivellum-ai
Requirements
- Python 3.10+
httpx(HTTP client)
RivellumClient
Full-featured async blockchain client. All methods are async and return dicts or lists of dicts.
Constructor
from rivellum_ai import RivellumClient
client = RivellumClient(
node_url="https://rpc.rivellum.network",
timeout=30.0,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
node_url | str | "https://api.rivellum.network" | Node RPC base URL |
timeout | float | 30.0 | Request timeout in seconds |
Async context manager:
async with RivellumClient("https://rpc.rivellum.network") as client:
balance = await client.get_balance("a1b2...64hex")
print(balance["balance"])
Health & Info
health()
async def health(self) -> dict
# Returns: {"status": "ok", "service": "rivellum-node"}
is_healthy()
async def is_healthy(self) -> bool
# Returns True if health()["status"] == "ok"
get_chain_info()
async def get_chain_info(self) -> dict
# Returns: {
# "version": "1.0.0",
# "chain_id": "rivellum-mainnet",
# "features": { "batching_enabled": True, "move_vm_enabled": True, ... }
# }
Account
get_balance(address)
async def get_balance(self, address: str) -> dict
# Returns: {
# "address": "...",
# "balance": "1000000000",
# "nonce": 42,
# "balances": {"assetId_hex": "amount"} # Optional asset sub-balances
# }
get_nonce(address)
async def get_nonce(self, address: str) -> dict
# Returns: {"nonce": 42}
get_ledger_tip()
async def get_ledger_tip(self) -> dict
# Returns: {"global_height": 12345, "meta_root": "...hex..."}
Contract (Mist)
contract(contract_id) → ContractHandle
def contract(self, contract_id: str) -> ContractHandle
Returns a ContractHandle for the given contract address.
counter = client.contract("deadbeef...64hex")
result = await counter.call("increment", {"by": 5})
print(result["receipt_id"])
ContractHandle.call(function, args=None)
async def call(self, function: str, args: dict | None = None) -> dict
# Returns: {
# "receipt_id": "...",
# "contract_id": "...",
# "function": "increment",
# "status": "ok"
# }
ContractHandle.get_info()
async def get_info(self) -> dict
# Returns: {"contract_id": "...", "name": "...", "bytecode_hash": "...", "functions": {...}}
deploy_contract(bundle, name=None)
async def deploy_contract(self, bundle: any, name: str | None = None) -> dict
# Returns: {"contract_id": "...", "receipt_id": "...", "status": "ok"}
get_receipt(receipt_id) / get_receipts()
async def get_receipt(self, receipt_id: str) -> dict
async def get_receipts(self) -> list[dict]
get_pvi(contract_id)
async def get_pvi(self, contract_id: str) -> dict
Returns the PVI (Public Verifiable Interface) specification for a contract.
Envelope Submission
All intent submission goes through encrypted envelopes. Plaintext submission is permanently disabled.
submit_envelope(envelope)
async def submit_envelope(self, envelope: dict) -> dict
# envelope fields: sender, nonce, lane_hint, epoch_id, max_fee, encrypted_payload, signature
# Returns: {"envelope_id": "...", "status": "accepted", "batch_window_id": "..."}
submit_envelope_batch(envelopes)
async def submit_envelope_batch(self, envelopes: list[dict]) -> dict
# Returns: {"accepted": N, "rejected": M}
get_envelope_status(envelope_id)
async def get_envelope_status(self, envelope_id: str) -> dict | None
# Returns None on 404
# Status values: received | admitted | batched | sealed | decrypted | ordered | executed | finalized | rejected
# Returns: {
# "envelope_id": "...",
# "status": "finalized",
# "sender": "...",
# "nonce": 42,
# "lane_id": "...",
# "epoch_id": 5,
# "batch_id": "..." # once batched
# }
get_batch_info(batch_id)
async def get_batch_info(self, batch_id: str) -> dict | None
# Returns None on 404
# Returns: {"batch_id": "...", "state": "sealed", "envelope_count": 120}
get_admission_ticket(intent_id)
async def get_admission_ticket(self, intent_id: str) -> dict | None
# Returns None on 404
# Returns: {"admission_id": "...", "status": "admitted", "sender": "...", "validated_nonce": 42}
Simulation
simulate_intent(intent)
async def simulate_intent(self, intent: dict) -> dict
# Returns: {"success": True, "gas_estimate": 500, "result": {...}}
Events & Ledger
get_recent_events()
async def get_recent_events(self) -> dict
# Returns: {"events": [...], "total": N}
query_events(topic=None, contract=None, address=None, intent_id=None, limit=50, from_cursor=None)
async def query_events(
self,
topic: str | None = None,
contract: str | None = None,
address: str | None = None,
intent_id: str | None = None,
limit: int = 50,
from_cursor: str | None = None,
) -> dict
# Returns: {"events": [...], "next_cursor": "...", "total": N}
events = await client.query_events(topic="native.transfer", limit=20)
for e in events["events"]:
print(f"Transfer at height {e['batch_height']}: {e['data']}")
# Paginate
while True:
page = await client.query_events(topic="native.transfer", limit=100, from_cursor=cursor)
events.extend(page["events"])
cursor = page.get("next_cursor")
if not cursor:
break
PoUW
get_provers()
async def get_provers(self) -> list[dict]
# Returns: [{"prover_id": "...", "jobs_completed": 1000, "total_fee_earned": "...", ...}]
get_pending_jobs()
async def get_pending_jobs(self) -> list[dict]
# Returns: [{"job_id": "...", "tier": "High", "fee_budget": "...", "deadline_ms": ...}]
Faucet (Testnet)
faucet_mint(address, amount)
async def faucet_mint(self, address: str, amount: str) -> dict
# Returns: {"success": True, "tx_hash": "...", "minted": "1000000"}
Governance
get_governance_params() / get_governance_history()
async def get_governance_params(self) -> dict
# Returns epoch_length_slots, slot_duration_ms, etc.
async def get_governance_history(self) -> list[dict]
# Returns list of parameter change proposals
NFTs
async def get_nft(self, nft_id: str) -> dict
async def get_nfts_owned_by(self, address: str) -> dict
async def get_nft_transfer_history(self, nft_id: str) -> list[dict]
async def list_collections(self) -> dict
async def get_collection(self, collection_id: str) -> dict
async def get_collection_nfts(self, collection_id: str) -> dict
Prefabs
async def list_prefabs(self) -> list[dict]
async def get_prefab(self, prefab_id: str) -> dict
Bridge & XCM
async def get_bridge_status(self) -> dict
async def get_bridge_chains(self) -> list[dict]
async def get_bridge_messages(self) -> list[dict]
async def get_protocol_upgrades(self) -> list[dict]
Randomness
get_randomness_beacon()
async def get_randomness_beacon(self) -> dict
# Returns: {
# "height": 12345,
# "output": "hex...",
# "proof": "hex...",
# "producer": "addr...",
# "timestamp_ms": 1700000000000,
# "verified": True
# }
Constraint Engine
get_asset_constraints(asset_id) / get_freeze_status(asset_id, account)
async def get_asset_constraints(self, asset_id: str) -> dict
# Returns: {"asset_id": "...", "constrained": True, "policy_id": "...", "paused": False}
async def get_freeze_status(self, asset_id: str, account: str) -> dict
# Returns: {"frozen": False}
Move VM Endpoint Status
Public production APIs do not expose /move/* routes.
The Python SDK keeps the following methods for compatibility, but they intentionally fail fast with a clear error on production networks:
async def call_move_view(...)
async def get_move_module(...)
async def get_move_module_abi(...)
async def get_account_resources(...)
async def get_account_modules(...)
Use Mist and /v1/* RPC capabilities for public production workflows.
RivellumAgent
AI/agent-centric class with session key support and high-level payment primitives. Covers the same functionality as @rivellum/ai-sdk but in Python.
Constructor
from rivellum_ai import RivellumAgent
agent = RivellumAgent(
node_url="https://rpc.rivellum.network",
session_key="sk_...",
timeout_ms=30000,
)
Async context manager:
async with RivellumAgent(node_url="https://rpc.rivellum.network", session_key="sk_...") as agent:
await agent.pay(to="0xabc...", amount=1_000_000)
Payment Methods
pay(to, amount, max_slippage_bps=None, idempotency_key=None)
async def pay(
self,
to: str,
amount: int,
max_slippage_bps: int | None = None,
idempotency_key: str | None = None,
) -> dict
# Returns: {"accepted": True, "intent_id": "...", "status": "...", "estimated_cost": N}
purchase(service_id, max_price, freshness_secs=None)
async def purchase(
self,
service_id: str,
max_price: int,
freshness_secs: int | None = None,
) -> dict
subscribe(service_id, max_price_per_period, periods)
async def subscribe(
self,
service_id: str,
max_price_per_period: int,
periods: int,
) -> dict
intent(action)
async def intent(self, action: dict) -> dict
# Raw intent — all fields pass through, session_key auto-injected
Escrow Methods
escrow(provider, amount, description, ttl_secs, verification=None)
async def escrow(
self,
provider: str,
amount: int,
description: str,
ttl_secs: int,
verification: str | None = None,
) -> dict
# Returns: {"escrow_id": "...", "state": "active", "amount": N, "expires_at": ms}
release_escrow(escrow_id, proof=None) / dispute_escrow(escrow_id, proof=None)
async def release_escrow(self, escrow_id: str, proof: str | None = None) -> dict
async def dispute_escrow(self, escrow_id: str, proof: str | None = None) -> dict
get_escrow(escrow_id)
async def get_escrow(self, escrow_id: str) -> dict
# Returns: {"escrow_id": "...", "state": "active|released|disputed|expired", ...}
Marketplace
search_market(category=None, max_price=None, min_reputation=None, query=None, limit=None)
async def search_market(
self,
category: str | None = None,
max_price: int | None = None,
min_reputation: float | None = None,
query: str | None = None,
limit: int | None = None,
) -> dict
# Returns: {"results": [{"service_id": "...", "name": "...", "price": "...", ...}], "total": N}
market_data(**kwargs) / market_compute(**kwargs) / market_models(**kwargs)
async def market_data(self, **kwargs) -> dict
async def market_compute(self, **kwargs) -> dict
async def market_models(self, **kwargs) -> dict
register_service(name, category, description, endpoint, pricing_model, price)
async def register_service(
self,
name: str,
category: str,
description: str,
endpoint: str,
pricing_model: str,
price: int,
) -> dict
# Returns: {"service_id": "...", "status": "registered"}
Agent Wallet
create_agent(owner, spending_limit, permissions=None, expires_in_secs=None, kind=None, name=None)
async def create_agent(
self,
owner: str,
spending_limit: int,
permissions: list[str] | None = None,
expires_in_secs: int | None = None,
kind: str | None = None,
name: str | None = None,
) -> dict
# Returns: {"agent_id": "...", "wallet_address": "...", "status": "..."}
get_agent(agent_id) / get_reputation(address) / get_budget(agent_id)
async def get_agent(self, agent_id: str) -> dict
async def get_reputation(self, address: str) -> dict
# Returns: {"address": "...", "score": 95, "success_rate": 0.99, ...}
async def get_budget(self, agent_id: str) -> dict
# Returns: {"total_budget": N, "spent": M, "remaining": K, ...}
create_session(agent_id, max_spend, expires_in_secs, allowed_actions=None, rate_limit_per_minute=None)
async def create_session(
self,
agent_id: str,
max_spend: int,
expires_in_secs: int,
allowed_actions: list[str] | None = None,
rate_limit_per_minute: int | None = None,
) -> dict
# Returns: {"session_key": "sk_...", "expires_at": ms, "max_spend": N}
Payment Streams
start_stream(receiver, rate_per_second, max_budget, service_id=None)
async def start_stream(
self,
receiver: str,
rate_per_second: int,
max_budget: int,
service_id: str | None = None,
) -> dict
# Returns: {"stream_id": "...", "state": "active", "rate_per_second": N, ...}
stop_stream(stream_id) / get_stream(stream_id)
async def stop_stream(self, stream_id: str) -> dict
async def get_stream(self, stream_id: str) -> dict
top_providers(limit=None, min_score=None, category=None)
async def top_providers(
self,
limit: int | None = None,
min_score: float | None = None,
category: str | None = None,
) -> dict
# Returns: {"providers": [...], "total": N}
Error Handling
The Python SDK uses httpx.HTTPStatusError for HTTP errors:
import httpx
from rivellum_ai import RivellumClient
async with RivellumClient("https://rpc.rivellum.network") as client:
try:
bal = await client.get_balance("invalid_address")
except httpx.HTTPStatusError as e:
print(f"HTTP {e.response.status_code}: {e.response.text}")
except httpx.RequestError as e:
print(f"Network error: {e}")
Full Example
import asyncio
from rivellum_ai import RivellumClient, RivellumAgent
async def main():
# === Chain queries ===
async with RivellumClient("https://rpc.rivellum.network") as client:
if not await client.is_healthy():
raise RuntimeError("Node not healthy")
# Balance
bal = await client.get_balance("a1b2...64hex")
print(f"Balance: {bal['balance']}, Nonce: {bal['nonce']}")
# Submit an envelope
result = await client.submit_envelope({
"sender": "a1b2...64hex",
"nonce": bal["nonce"],
"lane_hint": 3,
"epoch_id": 100,
"max_fee": "5000",
"encrypted_payload": "deadbeef...",
"signature": "sig...",
})
print(f"Submitted: {result['envelope_id']} ({result['status']})")
# Poll until finalized
while True:
status = await client.get_envelope_status(result["envelope_id"])
if not status:
break
if status["status"] in ("finalized", "rejected"):
break
await asyncio.sleep(2)
# Query recent transfers
events = await client.query_events(topic="native.transfer", limit=10)
for e in events["events"]:
print(f"Transfer: {e}")
# === AI agent workflow ===
async with RivellumAgent(
node_url="https://rpc.rivellum.network",
session_key="sk_your_key"
) as agent:
# Find a compute service
services = await agent.search_market(category="compute", min_reputation=85)
svc = services["results"][0]
# Create escrow for a job
job = await agent.escrow(
provider=svc["provider"],
amount=int(svc["price"]) * 10,
description="Batch inference job",
ttl_secs=3600,
)
print(f"Escrow: {job['escrow_id']}")
# Release on completion
await agent.release_escrow(job["escrow_id"])
asyncio.run(main())