You've built your RPA script. It works beautifully on your local machine. Now you need to deploy it to a VM that runs 24/7, handles failures gracefully, and doesn't require you to RDP in every morning to check if it's still alive.
Here's how to set up Windows VMs for RPA the right way with proper networking, monitoring, health checks, and bulletproof process management.
For RPA workloads, you want:
Provider recommendations:
AWS EC2: t3.medium
or t3.large
Windows instances. Easy spot instance support for cost savings.
Azure: B2s
or B2ms
VMs. Native Windows environment, good RDP experience.
Google Cloud: e2-medium
with Windows Server. Solid performance, competitive pricing.
DigitalOcean: Simple droplets starting at $24/month for Windows. Great for smaller workloads.
Once your VM is running, here's the setup checklist:
1. Set a strong password and enable RDP access (temporarily)
# From your local machine, RDP in
mstsc /v:your-vm-ip
2. Windows Updates (get this pain over with early)
# Check for updates
Start-Process ms-settings:windowsupdate
# Or via PowerShell
Install-Module PSWindowsUpdate
Get-WindowsUpdate
Install-WindowsUpdate -AcceptAll -AutoReboot
3. Install essential software
# Install Chocolatey (Windows package manager)
Set-ExecutionPolicy Bypass -Scope Process -Force
[System.Net.ServicePointManager]::SecurityProtocol = [System.Net.ServicePointManager]::SecurityProtocol -bor 3072
iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))
# Install Python
choco install python -y
# Install Git
choco install git -y
# Refresh environment variables
refreshenv
# Verify installations
python --version
git --version
4. Disable unnecessary services to save resources
# Disable Windows Search (RPA VMs don't need it)
Stop-Service "WSearch" -Force
Set-Service "WSearch" -StartupType Disabled
# Disable Windows Update during business hours (configure maintenance windows instead)
# We'll handle updates manually during off-hours
5. Configure Windows Firewall
# We'll open port 5000 for our Flask API (change as needed)
New-NetFirewallRule -DisplayName "RPA API" -Direction Inbound -Protocol TCP -LocalPort 5000 -Action Allow
# Verify the rule
Get-NetFirewallRule -DisplayName "RPA API"
6. Set up a dedicated service account
# Create a service account for running RPA scripts
$Password = ConvertTo-SecureString "YourStrongPassword123!" -AsPlainText -Force
New-LocalUser "RPAService" -Password $Password -FullName "RPA Service Account" -Description "Account for running RPA automations"
# Add to appropriate groups
Add-LocalGroupMember -Group "Users" -Member "RPAService"
# Grant logon as service right (needed for NSSM later)
# This requires secpol.msc or a script - we'll handle it when setting up NSSM
Your RPA scripts shouldn't just run blindly. You need a way to:
Create a Flask API to expose these capabilities securely.
Create a project structure:
C:\RPA\
├── api\
│ ├── app.py
│ ├── config.py
│ ├── auth.py
│ └── requirements.txt
├── scripts\
│ ├── your_rpa_script.py
│ └── process_invoices.py
├── logs\
└── data\
requirements.txt:
flask==3.0.0
flask-cors==4.0.0
python-dotenv==1.0.0
opentelemetry-api==1.21.0
opentelemetry-sdk==1.21.0
opentelemetry-exporter-otlp==1.21.0
opentelemetry-instrumentation-flask==0.42b0
psutil==5.9.6
config.py:
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# Security
API_KEY = os.getenv('API_KEY', 'change-this-in-production')
SECRET_KEY = os.getenv('SECRET_KEY', 'another-secret-key')
# Paths
SCRIPTS_DIR = r'C:\RPA\scripts'
LOGS_DIR = r'C:\RPA\logs'
# OpenTelemetry
OTEL_ENDPOINT = os.getenv('OTEL_ENDPOINT', 'http://localhost:4318')
SERVICE_NAME = os.getenv('SERVICE_NAME', 'rpa-vm-api')
auth.py:
from functools import wraps
from flask import request, jsonify
from config import Config
def require_api_key(f):
"""Decorator to require API key authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
if api_key != Config.API_KEY:
return jsonify({'error': 'Invalid API key'}), 403
return f(*args, **kwargs)
return decorated_function
app.py:
from flask import Flask, jsonify, request
from flask_cors import CORS
import subprocess
import psutil
import os
import json
from datetime import datetime
from pathlib import Path
import logging
from config import Config
from auth import require_api_key
# OpenTelemetry setup
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.sdk.resources import Resource
# Initialize OpenTelemetry
resource = Resource.create({"service.name": Config.SERVICE_NAME})
# Tracing
trace_provider = TracerProvider(resource=resource)
otlp_trace_exporter = OTLPSpanExporter(endpoint=Config.OTEL_ENDPOINT)
trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter))
trace.set_tracer_provider(trace_provider)
# Metrics
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=Config.OTEL_ENDPOINT)
)
metrics.set_meter_provider(MeterProvider(resource=resource, metric_readers=[metric_reader]))
# Flask app
app = Flask(__name__)
app.config.from_object(Config)
CORS(app)
# Instrument Flask with OpenTelemetry
FlaskInstrumentor().instrument_app(app)
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(Config.LOGS_DIR, 'api.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Track running processes
running_processes = {}
@app.route('/health', methods=['GET'])
def health_check():
"""
Public health check endpoint
Returns system status and basic metrics
"""
try:
# Get system metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('C:\\')
# Check if any scripts are running
scripts_running = len(running_processes)
health_status = {
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'system': {
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_available_gb': round(memory.available / (1024**3), 2),
'disk_percent': disk.percent,
'disk_free_gb': round(disk.free / (1024**3), 2)
},
'rpa': {
'scripts_running': scripts_running,
'scripts': list(running_processes.keys())
}
}
# Set unhealthy if resources are critically low
if cpu_percent > 95 or memory.percent > 95 or disk.percent > 90:
health_status['status'] = 'unhealthy'
health_status['warnings'] = []
if cpu_percent > 95:
health_status['warnings'].append('CPU usage critical')
if memory.percent > 95:
health_status['warnings'].append('Memory usage critical')
if disk.percent > 90:
health_status['warnings'].append('Disk space low')
status_code = 200 if health_status['status'] == 'healthy' else 503
return jsonify(health_status), status_code
except Exception as e:
logger.error(f"Health check failed: {e}")
return jsonify({
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.now().isoformat()
}), 503
@app.route('/api/scripts', methods=['GET'])
@require_api_key
def list_scripts():
"""List all available RPA scripts"""
try:
scripts_dir = Path(Config.SCRIPTS_DIR)
scripts = []
for script_file in scripts_dir.glob('*.py'):
# Get script metadata if available
script_info = {
'name': script_file.stem,
'filename': script_file.name,
'path': str(script_file),
'size_kb': round(script_file.stat().st_size / 1024, 2),
'modified': datetime.fromtimestamp(script_file.stat().st_mtime).isoformat(),
'is_running': script_file.stem in running_processes
}
scripts.append(script_info)
return jsonify({
'scripts': scripts,
'count': len(scripts)
})
except Exception as e:
logger.error(f"Error listing scripts: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/scripts/<script_name>/start', methods=['POST'])
@require_api_key
def start_script(script_name):
"""Start an RPA script"""
try:
if script_name in running_processes:
return jsonify({
'error': f'Script {script_name} is already running',
'pid': running_processes[script_name]['pid']
}), 400
script_path = Path(Config.SCRIPTS_DIR) / f'{script_name}.py'
if not script_path.exists():
return jsonify({'error': f'Script {script_name} not found'}), 404
# Get parameters from request
params = request.json.get('parameters', {}) if request.json else {}
# Start the script as a subprocess
cmd = ['python', str(script_path)]
# Add parameters as command line args
for key, value in params.items():
cmd.extend([f'--{key}', str(value)])
logger.info(f"Starting script: {script_name} with command: {' '.join(cmd)}")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=Config.SCRIPTS_DIR
)
# Track the process
running_processes[script_name] = {
'pid': process.pid,
'started_at': datetime.now().isoformat(),
'process': process,
'parameters': params
}
logger.info(f"Script {script_name} started with PID {process.pid}")
return jsonify({
'message': f'Script {script_name} started successfully',
'pid': process.pid,
'started_at': running_processes[script_name]['started_at']
}), 200
except Exception as e:
logger.error(f"Error starting script {script_name}: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/scripts/<script_name>/stop', methods=['POST'])
@require_api_key
def stop_script(script_name):
"""Stop a running RPA script"""
try:
if script_name not in running_processes:
return jsonify({'error': f'Script {script_name} is not running'}), 400
process_info = running_processes[script_name]
process = process_info['process']
# Try graceful termination first
process.terminate()
try:
process.wait(timeout=10)
logger.info(f"Script {script_name} terminated gracefully")
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
process.kill()
logger.warning(f"Script {script_name} force killed")
# Remove from tracking
del running_processes[script_name]
return jsonify({
'message': f'Script {script_name} stopped successfully',
'pid': process_info['pid']
}), 200
except Exception as e:
logger.error(f"Error stopping script {script_name}: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/scripts/<script_name>/status', methods=['GET'])
@require_api_key
def script_status(script_name):
"""Get the status of a specific script"""
try:
if script_name not in running_processes:
return jsonify({
'script': script_name,
'is_running': False
})
process_info = running_processes[script_name]
process = process_info['process']
# Check if process is still alive
poll = process.poll()
if poll is not None:
# Process has finished
del running_processes[script_name]
return jsonify({
'script': script_name,
'is_running': False,
'exit_code': poll
})
# Process is still running
try:
proc = psutil.Process(process_info['pid'])
cpu_percent = proc.cpu_percent(interval=0.1)
memory_mb = proc.memory_info().rss / (1024 * 1024)
return jsonify({
'script': script_name,
'is_running': True,
'pid': process_info['pid'],
'started_at': process_info['started_at'],
'parameters': process_info.get('parameters', {}),
'resources': {
'cpu_percent': round(cpu_percent, 2),
'memory_mb': round(memory_mb, 2)
}
})
except psutil.NoSuchProcess:
del running_processes[script_name]
return jsonify({
'script': script_name,
'is_running': False
})
except Exception as e:
logger.error(f"Error getting script status: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/logs/<script_name>', methods=['GET'])
@require_api_key
def get_logs(script_name):
"""Get recent logs for a script"""
try:
log_file = Path(Config.LOGS_DIR) / f'{script_name}.log'
if not log_file.exists():
return jsonify({'error': f'Log file for {script_name} not found'}), 404
# Get number of lines to return (default 100)
lines = request.args.get('lines', 100, type=int)
# Read last N lines
with open(log_file, 'r') as f:
all_lines = f.readlines()
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
return jsonify({
'script': script_name,
'lines': recent_lines,
'total_lines': len(all_lines)
})
except Exception as e:
logger.error(f"Error reading logs: {e}")
return jsonify({'error': str(e)}), 500
@app.route('/api/metrics', methods=['GET'])
@require_api_key
def get_metrics():
"""Get detailed system and RPA metrics"""
try:
# System metrics
cpu_percent = psutil.cpu_percent(interval=1, percpu=True)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('C:\\')
# Network I/O
net_io = psutil.net_io_counters()
# Running processes details
scripts_detail = []
for script_name, info in running_processes.items():
try:
proc = psutil.Process(info['pid'])
scripts_detail.append({
'name': script_name,
'pid': info['pid'],
'cpu_percent': proc.cpu_percent(interval=0.1),
'memory_mb': round(proc.memory_info().rss / (1024 * 1024), 2),
'started_at': info['started_at']
})
except psutil.NoSuchProcess:
pass
return jsonify({
'timestamp': datetime.now().isoformat(),
'system': {
'cpu_percent_per_core': cpu_percent,
'cpu_percent_avg': round(sum(cpu_percent) / len(cpu_percent), 2),
'memory': {
'total_gb': round(memory.total / (1024**3), 2),
'available_gb': round(memory.available / (1024**3), 2),
'used_gb': round(memory.used / (1024**3), 2),
'percent': memory.percent
},
'disk': {
'total_gb': round(disk.total / (1024**3), 2),
'used_gb': round(disk.used / (1024**3), 2),
'free_gb': round(disk.free / (1024**3), 2),
'percent': disk.percent
},
'network': {
'bytes_sent_mb': round(net_io.bytes_sent / (1024**2), 2),
'bytes_recv_mb': round(net_io.bytes_recv / (1024**2), 2)
}
},
'rpa': {
'scripts_running': len(running_processes),
'scripts': scripts_detail
}
})
except Exception as e:
logger.error(f"Error getting metrics: {e}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
# Ensure log directory exists
os.makedirs(Config.LOGS_DIR, exist_ok=True)
logger.info("Starting RPA Control API")
# Run Flask app
# In production, use a proper WSGI server like waitress
app.run(host='0.0.0.0', port=5000, debug=False)
Create a .env file:
API_KEY=your-super-secret-api-key-change-this
SECRET_KEY=another-secret-key-for-flask
OTEL_ENDPOINT=http://localhost:4318
SERVICE_NAME=rpa-vm-api
Install dependencies:
cd C:\RPA\api
pip install -r requirements.txt
Test the API locally:
python app.py
Try the health check:
curl http://localhost:5000/health
NSSM (Non-Sucking Service Manager) turns your Python script into a proper Windows service that:
1. Install NSSM:
choco install nssm -y
2. Create the service:
# Navigate to your API directory
cd C:\RPA\api
# Install as Windows service
nssm install RPAControlAPI "C:\Python311\python.exe" "C:\RPA\api\app.py"
# Configure service parameters
nssm set RPAControlAPI AppDirectory "C:\RPA\api"
nssm set RPAControlAPI DisplayName "RPA Control API"
nssm set RPAControlAPI Description "Flask API for controlling RPA scripts"
# Set startup type to automatic
nssm set RPAControlAPI Start SERVICE_AUTO_START
# Configure logging
nssm set RPAControlAPI AppStdout "C:\RPA\logs\api_stdout.log"
nssm set RPAControlAPI AppStderr "C:\RPA\logs\api_stderr.log"
# Rotate logs when they exceed 10MB
nssm set RPAControlAPI AppRotateFiles 1
nssm set RPAControlAPI AppRotateBytes 10485760
# Set environment variables
nssm set RPAControlAPI AppEnvironmentExtra "PYTHONUNBUFFERED=1"
# Configure restart on failure
nssm set RPAControlAPI AppExit Default Restart
nssm set RPAControlAPI AppRestartDelay 5000 # 5 second delay before restart
# Set throttle to prevent restart loops (max 3 restarts in 60 seconds)
nssm set RPAControlAPI AppThrottle 60000
# Start the service
nssm start RPAControlAPI
3. Verify the service is running:
# Check service status
nssm status RPAControlAPI
# Or use Windows services
Get-Service RPAControlAPI
# Check the logs
Get-Content C:\RPA\logs\api_stdout.log -Tail 20
4. Useful NSSM commands for management:
# Stop the service
nssm stop RPAControlAPI
# Restart the service
nssm restart RPAControlAPI
# Edit service configuration
nssm edit RPAControlAPI
# Remove service (if needed)
nssm remove RPAControlAPI confirm
Running on HTTP is fine for testing, but production needs HTTPS.
Install nginx as a reverse proxy with SSL:
choco install nginx -y
Configure nginx (C:\tools\nginx\conf\nginx.conf
):
worker_processes 1;
events {
worker_connections 1024;
}
http {
# Redirect HTTP to HTTPS
server {
listen 80;
server_name your-vm-domain.com;
return 301 https://$server_name$request_uri;
}
# HTTPS server
server {
listen 443 ssl;
server_name your-vm-domain.com;
ssl_certificate C:/RPA/ssl/cert.pem;
ssl_certificate_key C:/RPA/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# Health check should be fast
location /health {
proxy_pass http://127.0.0.1:5000/health;
access_log off;
}
}
}
Generate self-signed certificate (for testing):
# Create SSL directory
New-Item -ItemType Directory -Path C:\RPA\ssl -Force
# Generate certificate (requires OpenSSL - install via choco install openssl)
cd C:\RPA\ssl
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes
For production, use Let's Encrypt (free SSL certificates):
choco install win-acme -y
# Follow the prompts to get a real SSL certificate
Install nginx as a service:
nssm install NginxProxy "C:\tools\nginx\nginx.exe"
nssm set NginxProxy AppDirectory "C:\tools\nginx"
nssm start NginxProxy
Update firewall rules:
# Open HTTPS port
New-NetFirewallRule -DisplayName "HTTPS" -Direction Inbound -Protocol TCP -LocalPort 443 -Action Allow
# Close direct access to Flask (only nginx should access it)
Remove-NetFirewallRule -DisplayName "RPA API"
Modify your Flask app to use Waitress WSGI server with SSL:
# app.py - add at the bottom
if __name__ == '__main__':
from waitress import serve
logger.info("Starting RPA Control API with Waitress")
# For HTTPS, you need cert and key files
serve(
app,
host='0.0.0.0',
port=443,
url_scheme='https',
# These paths should point to your SSL certificate files
# cert_file='C:/RPA/ssl/cert.pem',
# key_file='C:/RPA/ssl/key.pem',
)
Install the OpenTelemetry Collector to aggregate logs and metrics:
1. Download OpenTelemetry Collector:
# Create otel directory
New-Item -ItemType Directory -Path C:\RPA\otel -Force
cd C:\RPA\otel
# Download collector (check for latest version)
Invoke-WebRequest -Uri "https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.91.0/otelcol_0.91.0_windows_amd64.tar.gz" -OutFile "otelcol.tar.gz"
# Extract (requires 7zip)
choco install 7zip -y
& "C:\Program Files\7-Zip\7z.exe" x otelcol.tar.gz
& "C:\Program Files\7-Zip\7z.exe" x otelcol.tar
2. Create OpenTelemetry Collector config (C:\RPA\otel\config.yaml
):
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
# Host metrics
hostmetrics:
collection_interval: 30s
scrapers:
cpu:
memory:
disk:
network:
processors:
batch:
timeout: 10s
send_batch_size: 1024
# Add resource attributes
resource:
attributes:
- key: service.instance.id
value: rpa-vm-01
action: insert
exporters:
# Log to file
file:
path: C:\RPA\logs\otel_traces.json
# Send to external service (optional - configure your backend)
# otlp:
# endpoint: your-observability-backend:4317
# Prometheus metrics (optional)
prometheus:
endpoint: "0.0.0.0:8889"
# Debug logging
logging:
loglevel: info
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, resource]
exporters: [file, logging]
metrics:
receivers: [otlp, hostmetrics]
processors: [batch, resource]
exporters: [file, prometheus, logging]
telemetry:
logs:
level: info
3. Install OpenTelemetry Collector as a service:
nssm install OTelCollector "C:\RPA\otel\otelcol.exe" "--config=C:\RPA\otel\config.yaml"
nssm set OTelCollector AppDirectory "C:\RPA\otel"
nssm set OTelCollector DisplayName "OpenTelemetry Collector"
nssm set OTelCollector Description "Collects telemetry data from RPA services"
nssm set OTelCollector Start SERVICE_AUTO_START
# Start the collector
nssm start OTelCollector
4. Verify OpenTelemetry is receiving data:
# Check the collector logs
Get-Content C:\RPA\logs\otel_traces.json -Tail 20
# Check Prometheus metrics endpoint
curl http://localhost:8889/metrics
For production, send your telemetry to a proper observability platform:
Option A: Self-hosted (Free)
Option B: Cloud Services
Example: Configure for Grafana Cloud:
Update your config.yaml
:
exporters:
otlp:
endpoint: otlp-gateway-prod-us-central-0.grafana.net:443
headers:
authorization: "Basic your-base64-encoded-credentials"
Create a simple monitoring script that checks your API health:
C:\RPA\monitor\health_monitor.py:
import requests
import time
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('C:/RPA/logs/health_monitor.log'),
logging.StreamHandler()
]
)
API_URL = "https://localhost/health"
CHECK_INTERVAL = 60 # seconds
ALERT_THRESHOLD = 3 # consecutive failures before alert
consecutive_failures = 0
def check_health():
global consecutive_failures
try:
response = requests.get(API_URL, verify=False, timeout=10)
if response.status_code == 200:
data = response.json()
status = data.get('status')
if status == 'healthy':
logging.info(f"✓ Health check passed - CPU: {data['system']['cpu_percent']}%, Memory: {data['system']['memory_percent']}%")
consecutive_failures = 0
else:
logging.warning(f"⚠ Health check returned unhealthy status: {data}")
consecutive_failures += 1
else:
logging.error(f"✗ Health check failed with status code: {response.status_code}")
consecutive_failures += 1
except requests.exceptions.RequestException as e:
logging.error(f"✗ Health check failed with exception: {e}")
consecutive_failures += 1
# Alert if threshold exceeded
if consecutive_failures >= ALERT_THRESHOLD:
send_alert(f"RPA API health check failed {consecutive_failures} times consecutively")
def send_alert(message):
"""Send alert via email, Slack, SMS, etc."""
logging.critical(f"ALERT: {message}")
# Example: Send to Slack webhook
# slack_webhook = "your-slack-webhook-url"
# requests.post(slack_webhook, json={'text': message})
# Example: Send email
# send_email(to="admin@company.com", subject="RPA VM Alert", body=message)
if __name__ == '__main__':
logging.info("Starting health monitor")
while True:
check_health()
time.sleep(CHECK_INTERVAL)
Install as a service:
nssm install RPAHealthMonitor "C:\Python311\python.exe" "C:\RPA\monitor\health_monitor.py"
nssm set RPAHealthMonitor AppDirectory "C:\RPA\monitor"
nssm set RPAHealthMonitor Start SERVICE_AUTO_START
nssm start RPAHealthMonitor
Now that your VM is set up, here's how to interact with it:
Python client:
import requests
class RPAClient:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.headers = {'X-API-Key': api_key}
def health_check(self):
response = requests.get(f"{self.base_url}/health")
return response.json()
def list_scripts(self):
response = requests.get(
f"{self.base_url}/api/scripts",
headers=self.headers
)
return response.json()
def start_script(self, script_name, parameters=None):
response = requests.post(
f"{self.base_url}/api/scripts/{script_name}/start",
headers=self.headers,
json={'parameters': parameters or {}}
)
return response.json()
def stop_script(self, script_name):
response = requests.post(
f"{self.base_url}/api/scripts/{script_name}/stop",
headers=self.headers
)
return response.json()
def get_script_status(self, script_name):
response = requests.get(
f"{self.base_url}/api/scripts/{script_name}/status",
headers=self.headers
)
return response.json()
def get_logs(self, script_name, lines=100):
response = requests.get(
f"{self.base_url}/api/logs/{script_name}",
headers=self.headers,
params={'lines': lines}
)
return response.json()
# Usage
client = RPAClient(
base_url="https://your-vm.example.com",
api_key="your-super-secret-api-key"
)
# Check health
health = client.health_check()
print(f"VM Status: {health['status']}")
# Start a script
result = client.start_script('process_invoices', {'batch_size': 50})
print(f"Script started: {result}")
# Check status
status = client.get_script_status('process_invoices')
print(f"Is running: {status['is_running']}")
# Get logs
logs = client.get_logs('process_invoices', lines=50)
print('\n'.join(logs['lines'][-10:])) # Last 10 lines
cURL examples:
# Health check (no auth required)
curl https://your-vm.example.com/health
# List scripts
curl -H "X-API-Key: your-api-key" https://your-vm.example.com/api/scripts
# Start script
curl -X POST \
-H "X-API-Key: your-api-key" \
-H "Content-Type: application/json" \
-d '{"parameters": {"customer_id": "12345"}}' \
https://your-vm.example.com/api/scripts/process_invoices/start
# Check status
curl -H "X-API-Key: your-api-key" \
https://your-vm.example.com/api/scripts/process_invoices/status
# Stop script
curl -X POST \
-H "X-API-Key: your-api-key" \
https://your-vm.example.com/api/scripts/process_invoices/stop
Daily checks:
Weekly tasks:
Monthly tasks:
pip list --outdated
Backup strategy:
# Create a backup script
# C:\RPA\backup\backup.ps1
$timestamp = Get-Date -Format "yyyyMMdd_HHmmss"
$backupPath = "C:\RPA\backups\backup_$timestamp"
# Create backup directory
New-Item -ItemType Directory -Path $backupPath -Force
# Backup scripts
Copy-Item -Path "C:\RPA\scripts\*" -Destination "$backupPath\scripts\" -Recurse
# Backup API
Copy-Item -Path "C:\RPA\api\*" -Destination "$backupPath\api\" -Recurse
# Backup configs
Copy-Item -Path "C:\RPA\otel\config.yaml" -Destination "$backupPath\"
Copy-Item -Path "C:\RPA\api\.env" -Destination "$backupPath\"
# Compress backup
Compress-Archive -Path $backupPath -DestinationPath "$backupPath.zip"
Remove-Item -Path $backupPath -Recurse
Write-Host "Backup completed: $backupPath.zip"
# Optional: Upload to S3, Azure Blob, etc.
Schedule the backup:
# Run daily at 2 AM
$action = New-ScheduledTaskAction -Execute "PowerShell.exe" -Argument "-File C:\RPA\backup\backup.ps1"
$trigger = New-ScheduledTaskTrigger -Daily -At 2am
Register-ScheduledTask -Action $action -Trigger $trigger -TaskName "RPA Backup" -Description "Daily backup of RPA configuration"
Service won't start:
# Check service status
nssm status RPAControlAPI
# View service logs
Get-Content C:\RPA\logs\api_stderr.log
# Test the script manually
cd C:\RPA\api
python app.py
Can't access API from external machine:
# Verify service is listening
netstat -ano | findstr "5000"
# Check firewall rules
Get-NetFirewallRule | Where-Object {$_.DisplayName -like "*RPA*"}
# Test locally first
curl http://localhost:5000/health
# Check cloud provider security groups (AWS, Azure, GCP)
High resource usage:
# Check what's using resources
Get-Process | Sort-Object CPU -Descending | Select-Object -First 10
# Restart services if needed
nssm restart RPAControlAPI
# Check for runaway RPA processes
Get-Process python
Setting up a production-ready RPA VM is more than just installing Python and running scripts. You need:
This setup might seem like overkill when you're just getting started, but it pays dividends when you're running dozens of automations across multiple VMs. The API gives you programmatic control, the monitoring tells you when things break, and the service management keeps everything running even when Windows decides to update at 3 AM.
Build it right once, and you won't be the person RDPing into VMs at midnight to restart stuck scripts.