Skip to content

Python SDK

The Hyra Network Python SDK provides a comprehensive toolkit for developers to interact with the Hyra Network, enabling AI model deployment, task processing, and network participation.

  • Python 3.8 or higher
  • pip package manager
  • Git (for development)
Terminal window
pip install hyra-network
Terminal window
git clone https://github.com/hyra-network/hyra-python-sdk.git
cd hyra-python-sdk
pip install -e .
Terminal window
git clone https://github.com/hyra-network/hyra-python-sdk.git
cd hyra-python-sdk
pip install -e ".[dev]"
from hyra_network import HyraClient
from hyra_network.config import Config
# Initialize client
config = Config(
network_url="https://testnet.hyra.network",
private_key="your_private_key",
wallet_address="your_wallet_address"
)
client = HyraClient(config)
# Check network connection
if client.is_connected():
print("Connected to Hyra Network")
else:
print("Failed to connect to network")
from hyra_network.tasks import Task
# Create a new task
task = Task(
description="Train a sentiment analysis model",
reward=1000, # HYRA tokens
deadline=86400, # 24 hours
data_hash="0x1234567890abcdef"
)
task_id = client.create_task(task)
print(f"Task created with ID: {task_id}")
# Get available tasks
available_tasks = client.get_available_tasks()
print(f"Found {len(available_tasks)} available tasks")
# Claim a task
if available_tasks:
task = available_tasks[0]
success = client.claim_task(task.id)
if success:
print(f"Successfully claimed task {task.id}")
# Process claimed task
def process_task(task_data):
# Your AI processing logic here
result = your_ai_model.predict(task_data)
return result
# Get task data
task_data = client.get_task_data(task_id)
# Process task
result = process_task(task_data)
# Submit result
client.submit_task_result(task_id, result)
from hyra_network.models import Model
# Create model instance
model = Model(
name="Sentiment Analysis Model",
description="A model for sentiment analysis",
category="NLP",
framework="PyTorch",
architecture="BERT",
parameters=110000000, # 110M parameters
input_format="text",
output_format="sentiment_score"
)
# Register model
model_id = client.register_model(model)
print(f"Model registered with ID: {model_id}")
# Deploy model for inference
deployment = client.deploy_model(
model_id=model_id,
deployment_config={
"replicas": 3,
"resources": {
"cpu": "2",
"memory": "4Gi",
"gpu": "1"
}
}
)
print(f"Model deployed with ID: {deployment.id}")
# Use deployed model for inference
result = client.infer(
model_id=model_id,
input_data="This is a great product!",
deployment_id=deployment.id
)
print(f"Sentiment score: {result.sentiment_score}")
from hyra_network.nodes import ComputeNode
# Create compute node
node = ComputeNode(
capabilities=["AI", "ML", "NLP"],
max_load=10,
resources={
"cpu": "8",
"memory": "16Gi",
"gpu": "2"
}
)
# Register node
node_id = client.register_node(node)
print(f"Node registered with ID: {node_id}")
# Start processing tasks
def task_processor(task):
# Process task
result = process_task(task.data)
return result
# Start processing loop
client.start_processing(task_processor)
from hyra_network.models import CustomModel
import torch
import torch.nn as nn
class MyCustomModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(10, 1)
def forward(self, x):
return self.linear(x)
# Create custom model
model = CustomModel(
name="Custom Model",
model_class=MyCustomModel,
training_data="path/to/training/data",
hyperparameters={
"learning_rate": 0.001,
"batch_size": 32,
"epochs": 100
}
)
# Register custom model
model_id = client.register_custom_model(model)
# Train model on Hyra Network
training_job = client.train_model(
model_id=model_id,
training_data="path/to/training/data",
training_config={
"epochs": 100,
"batch_size": 32,
"learning_rate": 0.001
}
)
# Monitor training progress
while not training_job.is_complete():
progress = training_job.get_progress()
print(f"Training progress: {progress.percentage}%")
time.sleep(10)
print("Training completed!")
from hyra_network.federated import FederatedLearning
# Create federated learning session
fl = FederatedLearning(
model_id=model_id,
participants=["node1", "node2", "node3"],
aggregation_method="fedavg",
communication_rounds=10
)
# Start federated learning
fl.start()
# Join federated learning session
def local_training(data):
# Your local training logic
return trained_model
# Participate in federated learning
fl.participate(local_training)
from hyra_network.privacy import HomomorphicEncryption
# Setup homomorphic encryption
he = HomomorphicEncryption()
# Encrypt data
encrypted_data = he.encrypt(data)
# Process encrypted data
encrypted_result = he.process(encrypted_data, model)
# Decrypt result
result = he.decrypt(encrypted_result)
from hyra_network.privacy import DifferentialPrivacy
# Setup differential privacy
dp = DifferentialPrivacy(epsilon=1.0, delta=1e-5)
# Add noise to data
noisy_data = dp.add_noise(data)
# Process with privacy
result = model.predict(noisy_data)
Terminal window
# Set environment variables
export HYRA_NETWORK_URL="https://testnet.hyra.network"
export HYRA_PRIVATE_KEY="your_private_key"
export HYRA_WALLET_ADDRESS="your_wallet_address"
config.yaml
network:
url: "https://testnet.hyra.network"
chain_id: 1
wallet:
private_key: "your_private_key"
address: "your_wallet_address"
logging:
level: "INFO"
file: "hyra.log"
from hyra_network.config import load_config
# Load from file
config = load_config("config.yaml")
# Load from environment
config = load_config()
# Create client with config
client = HyraClient(config)
from hyra_network.exceptions import (
HyraNetworkError,
TaskNotFoundError,
ModelNotFoundError,
InsufficientFundsError
)
try:
# Your code here
result = client.process_task(task_id)
except TaskNotFoundError:
print("Task not found")
except InsufficientFundsError:
print("Insufficient funds")
except HyraNetworkError as e:
print(f"Network error: {e}")
from hyra_network.utils import retry
@retry(max_attempts=3, delay=1.0)
def process_task_with_retry(task_id):
return client.process_task(task_id)
# Use retry decorator
result = process_task_with_retry(task_id)
import unittest
from hyra_network import HyraClient
from hyra_network.config import Config
class TestHyraClient(unittest.TestCase):
def setUp(self):
self.config = Config(
network_url="https://testnet.hyra.network",
private_key="test_private_key",
wallet_address="test_wallet_address"
)
self.client = HyraClient(self.config)
def test_connection(self):
self.assertTrue(self.client.is_connected())
def test_create_task(self):
task = Task(
description="Test task",
reward=100,
deadline=3600,
data_hash="0x123"
)
task_id = self.client.create_task(task)
self.assertIsNotNone(task_id)
if __name__ == "__main__":
unittest.main()
import pytest
from hyra_network import HyraClient
@pytest.fixture
def client():
config = Config(
network_url="https://testnet.hyra.network",
private_key="test_private_key",
wallet_address="test_wallet_address"
)
return HyraClient(config)
def test_task_lifecycle(client):
# Create task
task = Task(description="Test", reward=100, deadline=3600, data_hash="0x123")
task_id = client.create_task(task)
# Claim task
success = client.claim_task(task_id)
assert success
# Process task
result = client.process_task(task_id)
assert result is not None
# Submit result
success = client.submit_task_result(task_id, result)
assert success
import asyncio
from hyra_network.async_client import AsyncHyraClient
async def process_tasks_async():
client = AsyncHyraClient(config)
# Get available tasks
tasks = await client.get_available_tasks()
# Process tasks concurrently
results = await asyncio.gather(*[
client.process_task(task.id) for task in tasks
])
return results
# Run async operations
results = asyncio.run(process_tasks_async())
# Batch create tasks
tasks = [
Task(description=f"Task {i}", reward=100, deadline=3600, data_hash=f"0x{i}")
for i in range(10)
]
task_ids = client.batch_create_tasks(tasks)
print(f"Created {len(task_ids)} tasks")
# Batch process tasks
results = client.batch_process_tasks(task_ids)
print(f"Processed {len(results)} tasks")
from hyra_network.monitoring import MetricsCollector
# Setup metrics collection
metrics = MetricsCollector()
# Track performance
with metrics.track("task_processing"):
result = client.process_task(task_id)
# Get metrics
performance_metrics = metrics.get_metrics()
print(f"Average processing time: {performance_metrics.avg_processing_time}")
import logging
from hyra_network.logging import setup_logging
# Setup logging
setup_logging(level=logging.INFO, file="hyra.log")
# Use logger
logger = logging.getLogger("hyra_network")
logger.info("Processing task %s", task_id)
# Use secure configuration
config = Config(
network_url="https://mainnet.hyra.network",
private_key=os.getenv("HYRA_PRIVATE_KEY"), # Never hardcode
wallet_address=os.getenv("HYRA_WALLET_ADDRESS")
)
# Validate inputs
def validate_task(task):
if not task.description:
raise ValueError("Task description required")
if task.reward <= 0:
raise ValueError("Task reward must be positive")
return True
# Use context managers
with client.task_context(task_id) as task:
result = process_task(task.data)
task.submit_result(result)
# Cleanup resources
client.cleanup()
def robust_task_processing(task_id):
max_retries = 3
for attempt in range(max_retries):
try:
result = client.process_task(task_id)
return result
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
class HyraClient:
def __init__(self, config: Config):
"""Initialize Hyra client with configuration."""
def create_task(self, task: Task) -> str:
"""Create a new task."""
def get_available_tasks(self) -> List[Task]:
"""Get available tasks."""
def claim_task(self, task_id: str) -> bool:
"""Claim a task."""
def process_task(self, task_id: str) -> Any:
"""Process a task."""
def submit_task_result(self, task_id: str, result: Any) -> bool:
"""Submit task result."""
class Task:
def __init__(self, description: str, reward: int, deadline: int, data_hash: str):
"""Initialize task."""
@property
def id(self) -> str:
"""Get task ID."""
@property
def status(self) -> str:
"""Get task status."""
class Model:
def __init__(self, name: str, description: str, category: str, **kwargs):
"""Initialize model."""
def register(self, client: HyraClient) -> str:
"""Register model."""
def deploy(self, client: HyraClient, config: dict) -> str:
"""Deploy model."""
from hyra_network import HyraClient, Task, Model
from hyra_network.config import Config
def main():
# Setup
config = Config(
network_url="https://testnet.hyra.network",
private_key="your_private_key",
wallet_address="your_wallet_address"
)
client = HyraClient(config)
# Register model
model = Model(
name="Sentiment Analysis",
description="Sentiment analysis model",
category="NLP"
)
model_id = client.register_model(model)
# Create task
task = Task(
description="Analyze sentiment of customer reviews",
reward=1000,
deadline=86400,
data_hash="0x1234567890abcdef"
)
task_id = client.create_task(task)
# Process task
result = client.process_task(task_id)
print(f"Task result: {result}")
if __name__ == "__main__":
main()