Rivellum

Rivellum Portal

Download Wallet (Chrome)
Checking...
testnet

Python SDK (`rivellum-ai`)

The official Python async client for the Rivellum network. Provides two classes:

  • RivellumClient — full blockchain query and submission client
  • RivellumAgent — AI/agent-centric helper (payments, escrow, marketplace, streams)

Installation

pip install rivellum-ai
# or
uv add rivellum-ai

Requirements

  • Python 3.10+
  • httpx (HTTP client)

RivellumClient

Full-featured async blockchain client. All methods are async and return dicts or lists of dicts.

Constructor

from rivellum_ai import RivellumClient

client = RivellumClient(
    node_url="https://rpc.rivellum.network",
    timeout=30.0,
)
ParameterTypeDefaultDescription
node_urlstr"https://api.rivellum.network"Node RPC base URL
timeoutfloat30.0Request timeout in seconds

Async context manager:

async with RivellumClient("https://rpc.rivellum.network") as client:
    balance = await client.get_balance("a1b2...64hex")
    print(balance["balance"])

Health & Info

health()

async def health(self) -> dict
# Returns: {"status": "ok", "service": "rivellum-node"}

is_healthy()

async def is_healthy(self) -> bool
# Returns True if health()["status"] == "ok"

get_chain_info()

async def get_chain_info(self) -> dict
# Returns: {
#   "version": "1.0.0",
#   "chain_id": "rivellum-mainnet",
#   "features": { "batching_enabled": True, "move_vm_enabled": True, ... }
# }

Account

get_balance(address)

async def get_balance(self, address: str) -> dict
# Returns: {
#   "address": "...",
#   "balance": "1000000000",
#   "nonce": 42,
#   "balances": {"assetId_hex": "amount"}  # Optional asset sub-balances
# }

get_nonce(address)

async def get_nonce(self, address: str) -> dict
# Returns: {"nonce": 42}

get_ledger_tip()

async def get_ledger_tip(self) -> dict
# Returns: {"global_height": 12345, "meta_root": "...hex..."}

Contract (Mist)

contract(contract_id)ContractHandle

def contract(self, contract_id: str) -> ContractHandle

Returns a ContractHandle for the given contract address.

counter = client.contract("deadbeef...64hex")
result = await counter.call("increment", {"by": 5})
print(result["receipt_id"])

ContractHandle.call(function, args=None)

async def call(self, function: str, args: dict | None = None) -> dict
# Returns: {
#   "receipt_id": "...",
#   "contract_id": "...",
#   "function": "increment",
#   "status": "ok"
# }

ContractHandle.get_info()

async def get_info(self) -> dict
# Returns: {"contract_id": "...", "name": "...", "bytecode_hash": "...", "functions": {...}}

deploy_contract(bundle, name=None)

async def deploy_contract(self, bundle: any, name: str | None = None) -> dict
# Returns: {"contract_id": "...", "receipt_id": "...", "status": "ok"}

get_receipt(receipt_id) / get_receipts()

async def get_receipt(self, receipt_id: str) -> dict
async def get_receipts(self) -> list[dict]

get_pvi(contract_id)

async def get_pvi(self, contract_id: str) -> dict

Returns the PVI (Public Verifiable Interface) specification for a contract.


Envelope Submission

All intent submission goes through encrypted envelopes. Plaintext submission is permanently disabled.

submit_envelope(envelope)

async def submit_envelope(self, envelope: dict) -> dict
# envelope fields: sender, nonce, lane_hint, epoch_id, max_fee, encrypted_payload, signature
# Returns: {"envelope_id": "...", "status": "accepted", "batch_window_id": "..."}

submit_envelope_batch(envelopes)

async def submit_envelope_batch(self, envelopes: list[dict]) -> dict
# Returns: {"accepted": N, "rejected": M}

get_envelope_status(envelope_id)

async def get_envelope_status(self, envelope_id: str) -> dict | None
# Returns None on 404
# Status values: received | admitted | batched | sealed | decrypted | ordered | executed | finalized | rejected
# Returns: {
#   "envelope_id": "...",
#   "status": "finalized",
#   "sender": "...",
#   "nonce": 42,
#   "lane_id": "...",
#   "epoch_id": 5,
#   "batch_id": "..."   # once batched
# }

get_batch_info(batch_id)

async def get_batch_info(self, batch_id: str) -> dict | None
# Returns None on 404
# Returns: {"batch_id": "...", "state": "sealed", "envelope_count": 120}

get_admission_ticket(intent_id)

async def get_admission_ticket(self, intent_id: str) -> dict | None
# Returns None on 404
# Returns: {"admission_id": "...", "status": "admitted", "sender": "...", "validated_nonce": 42}

Simulation

simulate_intent(intent)

async def simulate_intent(self, intent: dict) -> dict
# Returns: {"success": True, "gas_estimate": 500, "result": {...}}

Events & Ledger

get_recent_events()

async def get_recent_events(self) -> dict
# Returns: {"events": [...], "total": N}

query_events(topic=None, contract=None, address=None, intent_id=None, limit=50, from_cursor=None)

async def query_events(
    self,
    topic: str | None = None,
    contract: str | None = None,
    address: str | None = None,
    intent_id: str | None = None,
    limit: int = 50,
    from_cursor: str | None = None,
) -> dict
# Returns: {"events": [...], "next_cursor": "...", "total": N}
events = await client.query_events(topic="native.transfer", limit=20)
for e in events["events"]:
    print(f"Transfer at height {e['batch_height']}: {e['data']}")

# Paginate
while True:
    page = await client.query_events(topic="native.transfer", limit=100, from_cursor=cursor)
    events.extend(page["events"])
    cursor = page.get("next_cursor")
    if not cursor:
        break

PoUW

get_provers()

async def get_provers(self) -> list[dict]
# Returns: [{"prover_id": "...", "jobs_completed": 1000, "total_fee_earned": "...", ...}]

get_pending_jobs()

async def get_pending_jobs(self) -> list[dict]
# Returns: [{"job_id": "...", "tier": "High", "fee_budget": "...", "deadline_ms": ...}]

Faucet (Testnet)

faucet_mint(address, amount)

async def faucet_mint(self, address: str, amount: str) -> dict
# Returns: {"success": True, "tx_hash": "...", "minted": "1000000"}

Governance

get_governance_params() / get_governance_history()

async def get_governance_params(self) -> dict
# Returns epoch_length_slots, slot_duration_ms, etc.

async def get_governance_history(self) -> list[dict]
# Returns list of parameter change proposals

NFTs

async def get_nft(self, nft_id: str) -> dict
async def get_nfts_owned_by(self, address: str) -> dict
async def get_nft_transfer_history(self, nft_id: str) -> list[dict]
async def list_collections(self) -> dict
async def get_collection(self, collection_id: str) -> dict
async def get_collection_nfts(self, collection_id: str) -> dict

Prefabs

async def list_prefabs(self) -> list[dict]
async def get_prefab(self, prefab_id: str) -> dict

Bridge & XCM

async def get_bridge_status(self) -> dict
async def get_bridge_chains(self) -> list[dict]
async def get_bridge_messages(self) -> list[dict]
async def get_protocol_upgrades(self) -> list[dict]

Randomness

get_randomness_beacon()

async def get_randomness_beacon(self) -> dict
# Returns: {
#   "height": 12345,
#   "output": "hex...",
#   "proof": "hex...",
#   "producer": "addr...",
#   "timestamp_ms": 1700000000000,
#   "verified": True
# }

Constraint Engine

get_asset_constraints(asset_id) / get_freeze_status(asset_id, account)

async def get_asset_constraints(self, asset_id: str) -> dict
# Returns: {"asset_id": "...", "constrained": True, "policy_id": "...", "paused": False}

async def get_freeze_status(self, asset_id: str, account: str) -> dict
# Returns: {"frozen": False}

Move VM Endpoint Status

Public production APIs do not expose /move/* routes.

The Python SDK keeps the following methods for compatibility, but they intentionally fail fast with a clear error on production networks:

async def call_move_view(...)
async def get_move_module(...)
async def get_move_module_abi(...)
async def get_account_resources(...)
async def get_account_modules(...)

Use Mist and /v1/* RPC capabilities for public production workflows.


RivellumAgent

AI/agent-centric class with session key support and high-level payment primitives. Covers the same functionality as @rivellum/ai-sdk but in Python.

Constructor

from rivellum_ai import RivellumAgent

agent = RivellumAgent(
    node_url="https://rpc.rivellum.network",
    session_key="sk_...",
    timeout_ms=30000,
)

Async context manager:

async with RivellumAgent(node_url="https://rpc.rivellum.network", session_key="sk_...") as agent:
    await agent.pay(to="0xabc...", amount=1_000_000)

Payment Methods

pay(to, amount, max_slippage_bps=None, idempotency_key=None)

async def pay(
    self,
    to: str,
    amount: int,
    max_slippage_bps: int | None = None,
    idempotency_key: str | None = None,
) -> dict
# Returns: {"accepted": True, "intent_id": "...", "status": "...", "estimated_cost": N}

purchase(service_id, max_price, freshness_secs=None)

async def purchase(
    self,
    service_id: str,
    max_price: int,
    freshness_secs: int | None = None,
) -> dict

subscribe(service_id, max_price_per_period, periods)

async def subscribe(
    self,
    service_id: str,
    max_price_per_period: int,
    periods: int,
) -> dict

intent(action)

async def intent(self, action: dict) -> dict
# Raw intent — all fields pass through, session_key auto-injected

Escrow Methods

escrow(provider, amount, description, ttl_secs, verification=None)

async def escrow(
    self,
    provider: str,
    amount: int,
    description: str,
    ttl_secs: int,
    verification: str | None = None,
) -> dict
# Returns: {"escrow_id": "...", "state": "active", "amount": N, "expires_at": ms}

release_escrow(escrow_id, proof=None) / dispute_escrow(escrow_id, proof=None)

async def release_escrow(self, escrow_id: str, proof: str | None = None) -> dict
async def dispute_escrow(self, escrow_id: str, proof: str | None = None) -> dict

get_escrow(escrow_id)

async def get_escrow(self, escrow_id: str) -> dict
# Returns: {"escrow_id": "...", "state": "active|released|disputed|expired", ...}

Marketplace

search_market(category=None, max_price=None, min_reputation=None, query=None, limit=None)

async def search_market(
    self,
    category: str | None = None,
    max_price: int | None = None,
    min_reputation: float | None = None,
    query: str | None = None,
    limit: int | None = None,
) -> dict
# Returns: {"results": [{"service_id": "...", "name": "...", "price": "...", ...}], "total": N}

market_data(**kwargs) / market_compute(**kwargs) / market_models(**kwargs)

async def market_data(self, **kwargs) -> dict
async def market_compute(self, **kwargs) -> dict
async def market_models(self, **kwargs) -> dict

register_service(name, category, description, endpoint, pricing_model, price)

async def register_service(
    self,
    name: str,
    category: str,
    description: str,
    endpoint: str,
    pricing_model: str,
    price: int,
) -> dict
# Returns: {"service_id": "...", "status": "registered"}

Agent Wallet

create_agent(owner, spending_limit, permissions=None, expires_in_secs=None, kind=None, name=None)

async def create_agent(
    self,
    owner: str,
    spending_limit: int,
    permissions: list[str] | None = None,
    expires_in_secs: int | None = None,
    kind: str | None = None,
    name: str | None = None,
) -> dict
# Returns: {"agent_id": "...", "wallet_address": "...", "status": "..."}

get_agent(agent_id) / get_reputation(address) / get_budget(agent_id)

async def get_agent(self, agent_id: str) -> dict
async def get_reputation(self, address: str) -> dict
# Returns: {"address": "...", "score": 95, "success_rate": 0.99, ...}
async def get_budget(self, agent_id: str) -> dict
# Returns: {"total_budget": N, "spent": M, "remaining": K, ...}

create_session(agent_id, max_spend, expires_in_secs, allowed_actions=None, rate_limit_per_minute=None)

async def create_session(
    self,
    agent_id: str,
    max_spend: int,
    expires_in_secs: int,
    allowed_actions: list[str] | None = None,
    rate_limit_per_minute: int | None = None,
) -> dict
# Returns: {"session_key": "sk_...", "expires_at": ms, "max_spend": N}

Payment Streams

start_stream(receiver, rate_per_second, max_budget, service_id=None)

async def start_stream(
    self,
    receiver: str,
    rate_per_second: int,
    max_budget: int,
    service_id: str | None = None,
) -> dict
# Returns: {"stream_id": "...", "state": "active", "rate_per_second": N, ...}

stop_stream(stream_id) / get_stream(stream_id)

async def stop_stream(self, stream_id: str) -> dict
async def get_stream(self, stream_id: str) -> dict

top_providers(limit=None, min_score=None, category=None)

async def top_providers(
    self,
    limit: int | None = None,
    min_score: float | None = None,
    category: str | None = None,
) -> dict
# Returns: {"providers": [...], "total": N}

Error Handling

The Python SDK uses httpx.HTTPStatusError for HTTP errors:

import httpx
from rivellum_ai import RivellumClient

async with RivellumClient("https://rpc.rivellum.network") as client:
    try:
        bal = await client.get_balance("invalid_address")
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {e.response.text}")
    except httpx.RequestError as e:
        print(f"Network error: {e}")

Full Example

import asyncio
from rivellum_ai import RivellumClient, RivellumAgent

async def main():
    # === Chain queries ===
    async with RivellumClient("https://rpc.rivellum.network") as client:
        if not await client.is_healthy():
            raise RuntimeError("Node not healthy")

        # Balance
        bal = await client.get_balance("a1b2...64hex")
        print(f"Balance: {bal['balance']}, Nonce: {bal['nonce']}")

        # Submit an envelope
        result = await client.submit_envelope({
            "sender": "a1b2...64hex",
            "nonce": bal["nonce"],
            "lane_hint": 3,
            "epoch_id": 100,
            "max_fee": "5000",
            "encrypted_payload": "deadbeef...",
            "signature": "sig...",
        })
        print(f"Submitted: {result['envelope_id']} ({result['status']})")

        # Poll until finalized
        while True:
            status = await client.get_envelope_status(result["envelope_id"])
            if not status:
                break
            if status["status"] in ("finalized", "rejected"):
                break
            await asyncio.sleep(2)

        # Query recent transfers
        events = await client.query_events(topic="native.transfer", limit=10)
        for e in events["events"]:
            print(f"Transfer: {e}")

    # === AI agent workflow ===
    async with RivellumAgent(
        node_url="https://rpc.rivellum.network",
        session_key="sk_your_key"
    ) as agent:
        # Find a compute service
        services = await agent.search_market(category="compute", min_reputation=85)
        svc = services["results"][0]

        # Create escrow for a job
        job = await agent.escrow(
            provider=svc["provider"],
            amount=int(svc["price"]) * 10,
            description="Batch inference job",
            ttl_secs=3600,
        )
        print(f"Escrow: {job['escrow_id']}")

        # Release on completion
        await agent.release_escrow(job["escrow_id"])

asyncio.run(main())