Skip to content

Client Development Guide

Master building robust plugin clients with Pyvider RPC Plugin. Learn the fundamentals of client setup, configuration, and lifecycle management for production applications.

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

Quick Start

Here's a minimal plugin client to get you started:

import asyncio
from pyvider.rpcplugin import plugin_client
from calculator_pb2_grpc import CalculatorStub
from calculator_pb2 import AddRequest

async def main():
    # Connect to plugin server with automatic lifecycle management
    async with plugin_client(command=["python", "calculator.py"]) as client:
        await client.start()

        # Create gRPC stub from client's channel
        stub = CalculatorStub(client.grpc_channel)

        # Make RPC call
        request = AddRequest(a=5, b=3)
        result = await stub.Add(request)
        print(f"Result: {result.result}")  # Result: 8

if __name__ == "__main__":
    asyncio.run(main())

Using gRPC Stubs

The client provides a grpc_channel for making RPC calls. You must use generated gRPC stubs - the client doesn't provide automatic service proxies.

Client Architecture

The plugin client consists of several integrated components:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Plugin Client                                                   โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚ Service         โ”‚ โ”‚ Connection      โ”‚ โ”‚ Process         โ”‚   โ”‚
โ”‚  โ”‚ Discovery       โ”‚ โ”‚ Management      โ”‚ โ”‚ Management      โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                                 โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚ gRPC Channel    โ”‚ โ”‚ Authentication  โ”‚ โ”‚ Error Handling  โ”‚   โ”‚
โ”‚  โ”‚ Management      โ”‚ โ”‚ & Security      โ”‚ โ”‚ & Retry Logic   โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Understanding the client lifecycle helps with proper resource management:

  1. Process Launch - Start plugin server subprocess
  2. Transport Discovery - Negotiate optimal transport (Unix/TCP)
  3. Authentication - Magic cookie validation and mTLS handshake
  4. Service Discovery - Discover available gRPC services
  5. Channel Ready - gRPC channel established for RPC calls
  6. Active Communication - Normal RPC operations
  7. Graceful Shutdown - Clean process termination and resource cleanup
class ClientLifecycleExample:
    async def demonstrate_lifecycle(self):
        """Demonstrate complete client lifecycle."""

        # 1. Create client (no connection yet)
        client = plugin_client(command=["python", "calculator.py"])

        try:
            # 2-4. Start connection (launches process, performs handshake, discovers services)
            await client.start()
            print("โœ… Client connected and authenticated")

            # 5-6. Make RPC calls
            stub = CalculatorStub(client.grpc_channel)
            result = await stub.Add(AddRequest(a=10, b=5))
            print(f"๐Ÿ”ข Calculation result: {result.result}")

        finally:
            # 7. Clean shutdown
            await client.close()
            print("๐Ÿ”Œ Client disconnected and process terminated")

Client Patterns

Automatic lifecycle management with context manager:

async def context_manager_example():
    """Recommended pattern for automatic cleanup."""

    async with plugin_client(command=["python", "calculator.py"]) as client:
        await client.start()

        # Use client
        stub = CalculatorStub(client.grpc_channel)
        result = await stub.Add(AddRequest(a=10, b=5))
        print(f"Result: {result.result}")

    # Client automatically closed on context exit
    print("Client disconnected")

Manual control over client lifecycle:

async def manual_client_example():
    """Example of manual client lifecycle management."""

    # Create client (no connection yet)
    client = plugin_client(command=["python", "calculator.py"])

    try:
        # Start connection manually
        await client.start()
        print("โœ… Client connected")

        # Use client
        stub = CalculatorStub(client.grpc_channel)
        result = await stub.Add(AddRequest(a=10, b=5))
        print(f"Calculation: {result.result}")

    finally:
        # Always close client
        await client.close()
        print("๐Ÿ”Œ Client disconnected")

Configuration

Configuration Flow

To configure the CLIENT: Set environment variables in your own process

To configure the PLUGIN: Use config={"env": {...}}

Set environment variables in your process to configure client behavior:

import os
from pyvider.rpcplugin import plugin_client

# Configure client retry behavior
os.environ["PLUGIN_CLIENT_MAX_RETRIES"] = "5"
os.environ["PLUGIN_CONNECTION_TIMEOUT"] = "30"
os.environ["PLUGIN_CLIENT_BACKOFF_MULTIPLIER"] = "2.0"

# Create client (uses environment variables for client config)
client = plugin_client(command=["python", "plugin.py"])

Available client environment variables: - PLUGIN_CLIENT_MAX_RETRIES - Maximum connection retry attempts (default: 3) - PLUGIN_CONNECTION_TIMEOUT - Connection timeout in seconds (default: 10) - PLUGIN_CLIENT_BACKOFF_MULTIPLIER - Exponential backoff multiplier (default: 2.0) - PLUGIN_CLIENT_BACKOFF_MAX - Maximum backoff delay in seconds (default: 60)

Configure the plugin subprocess via config parameter:

from pyvider.rpcplugin import plugin_client

# Configure plugin server behavior
client = plugin_client(
    command=["python", "calculator.py"],
    config={
        "env": {
            # Plugin subprocess configuration
            "PLUGIN_LOG_LEVEL": "debug",
            "PLUGIN_AUTO_MTLS": "true",
            "PLUGIN_RATE_LIMIT_ENABLED": "true",
            "PLUGIN_RATE_LIMIT_REQUESTS_PER_SECOND": "100.0",
            "PLUGIN_HEALTH_SERVICE_ENABLED": "true",
        }
    }
)

await client.start()

Configuration Flow:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”        โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   Your Process (Client)     โ”‚        โ”‚  Plugin Subprocess (Server) โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค        โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Set environment variables:  โ”‚        โ”‚ Receives environment vars:  โ”‚
โ”‚ - os.environ["PLUGIN_..."]  โ”‚        โ”‚ - From config={"env": ...}  โ”‚
โ”‚ - PLUGIN_CLIENT_MAX_RETRIES โ”‚        โ”‚ - PLUGIN_LOG_LEVEL          โ”‚
โ”‚ - PLUGIN_CONNECTION_TIMEOUT โ”‚        โ”‚ - PLUGIN_AUTO_MTLS          โ”‚
โ”‚                             โ”‚        โ”‚ - PLUGIN_SERVER_PORT        โ”‚
โ”‚ Configures CLIENT behavior  โ”‚  -->   โ”‚ Configures SERVER behavior  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Configure mTLS for secure communication:

import os

# Configure client to verify server certificates
os.environ["PLUGIN_CLIENT_VERIFY_SERVER"] = "true"
os.environ["PLUGIN_CA_CERT"] = "file:///etc/ssl/certs/ca.crt"

# Configure plugin server with certificates
client = plugin_client(
    command=["python", "secure_plugin.py"],
    config={
        "env": {
            "PLUGIN_AUTO_MTLS": "true",
            "PLUGIN_SERVER_CERT": "file:///etc/ssl/certs/server.crt",
            "PLUGIN_SERVER_KEY": "file:///etc/ssl/private/server.key",
            "PLUGIN_CA_CERT": "file:///etc/ssl/certs/ca.crt",
        }
    }
)

await client.start()

Advanced Patterns

Multiple Service Clients

Manage multiple plugin connections:

class MultiPluginClient:
    def __init__(self):
        self.plugins = {}

    async def add_plugin(self, name: str, command: list[str]):
        """Add a plugin connection."""
        client = plugin_client(command=command)
        await client.start()
        self.plugins[name] = client
        return client

    async def close_all(self):
        """Close all plugin connections."""
        for name, client in self.plugins.items():
            await client.close()
            print(f"Closed {name}")

# Usage
async def multi_plugin_example():
    manager = MultiPluginClient()

    try:
        # Add multiple plugins
        calc = await manager.add_plugin("calculator", ["python", "calc.py"])
        db = await manager.add_plugin("database", ["python", "db.py"])

        # Use plugins
        calc_stub = CalculatorStub(calc.grpc_channel)
        result = await calc_stub.Add(AddRequest(a=5, b=3))

    finally:
        await manager.close_all()

Connection Pooling

Reuse connections efficiently:

from typing import Dict
from asyncio import Lock

class ClientPool:
    def __init__(self, command: list[str], pool_size: int = 5):
        self.command = command
        self.pool_size = pool_size
        self.clients: list = []
        self.available: list = []
        self.lock = Lock()

    async def initialize(self):
        """Initialize connection pool."""
        for _ in range(self.pool_size):
            client = plugin_client(command=self.command)
            await client.start()
            self.clients.append(client)
            self.available.append(client)

    async def acquire(self):
        """Get a client from the pool."""
        async with self.lock:
            while not self.available:
                await asyncio.sleep(0.1)
            return self.available.pop()

    async def release(self, client):
        """Return a client to the pool."""
        async with self.lock:
            self.available.append(client)

    async def close_all(self):
        """Close all connections."""
        for client in self.clients:
            await client.close()

# Usage
async def pool_example():
    pool = ClientPool(command=["python", "calculator.py"], pool_size=5)
    await pool.initialize()

    try:
        # Acquire client from pool
        client = await pool.acquire()

        try:
            stub = CalculatorStub(client.grpc_channel)
            result = await stub.Add(AddRequest(a=10, b=5))
            print(f"Result: {result.result}")
        finally:
            # Return to pool
            await pool.release(client)

    finally:
        await pool.close_all()

Best Practices

1. Always Use Context Managers

Prefer async with for automatic resource cleanup:

# Good
async with plugin_client(command=["python", "plugin.py"]) as client:
    await client.start()
    # Use client

# Avoid
client = plugin_client(command=["python", "plugin.py"])
await client.start()
# Easy to forget cleanup

2. Configure Timeouts

Set appropriate timeouts for your use case:

import os

# Short timeout for local plugins
os.environ["PLUGIN_CONNECTION_TIMEOUT"] = "5"

# Longer timeout for remote/slow plugins
os.environ["PLUGIN_CONNECTION_TIMEOUT"] = "30"

3. Handle Startup Failures

Always handle connection failures:

from pyvider.rpcplugin.exception import TransportError, HandshakeError

try:
    async with plugin_client(command=["python", "plugin.py"]) as client:
        await client.start()
        # Use client
except (TransportError, HandshakeError) as e:
    logger.error(f"Failed to connect: {e.message}")
    # Implement fallback or retry logic

4. Clean Resource Management

Ensure proper cleanup even with exceptions:

client = None
try:
    client = plugin_client(command=["python", "plugin.py"])
    await client.start()
    # Use client
except Exception as e:
    logger.error(f"Error: {e}")
    raise
finally:
    if client:
        await client.close()

Troubleshooting

Connection Issues

Problem: Client fails to connect to plugin

# Check plugin process status
try:
    await client.start()
except TransportError as e:
    logger.error(f"Connection failed: {e.message}")
    # Check if plugin process actually started
    # Check logs for plugin subprocess errors

Timeout Issues

Problem: Connection times out

# Increase timeout for slow-starting plugins
os.environ["PLUGIN_CONNECTION_TIMEOUT"] = "60"

# Or check if plugin is actually responding
import subprocess
result = subprocess.run(["python", "plugin.py", "--version"], capture_output=True)
print(result.stdout)  # Verify plugin executable works

Port Conflicts

Problem: Port already in use

# Use automatic port assignment in plugin server
# In plugin server code:
server = plugin_server(
    protocol=protocol,
    handler=handler,
    transport="tcp",
    port=0  # Automatic port assignment
)

Next Steps