Skip to content

Instantly share code, notes, and snippets.

@Somberor
Last active January 31, 2026 17:41
Show Gist options
  • Select an option

  • Save Somberor/0899bf25e2ab2ee2908d0e0c4b813303 to your computer and use it in GitHub Desktop.

Select an option

Save Somberor/0899bf25e2ab2ee2908d0e0c4b813303 to your computer and use it in GitHub Desktop.
AI Servers (NSFW + OCR) for Ubuntu + RTX 4000 Ada GPU
import os
import time
import traceback
import asyncio
from typing import Dict, Any, List, Optional
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from pydantic import BaseModel
from concurrent.futures import ThreadPoolExecutor
import uuid
from datetime import datetime, timedelta
import threading
import gc
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
import tensorflow as tf
def setup_gpu():
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print(f"✓ Found {len(gpus)} GPU(s)")
except RuntimeError as e:
print(f"GPU configuration error: {e}")
else:
print("✗ No GPUs found. Running on CPU.")
return gpus
gpus = setup_gpu()
try:
import opennsfw2 as n2
print("✓ OpenNSFW2 loaded successfully")
except ImportError as e:
print(f"✗ OpenNSFW2 not installed: {e}")
n2 = None
class JobStatus:
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class Job(BaseModel):
id: str
status: str = JobStatus.PENDING
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class JobQueue:
def __init__(self, max_concurrent_jobs=64, max_job_age_hours=24, max_jobs_in_memory=1000):
self.jobs: Dict[str, Job] = {}
self.queue: asyncio.Queue = asyncio.Queue()
self.max_concurrent = max_concurrent_jobs
self.active_jobs = 0
self.lock = threading.Lock()
self.max_job_age = timedelta(hours=max_job_age_hours)
self.max_jobs_in_memory = max_jobs_in_memory
self.cleanup_interval = 300
self.last_cleanup = datetime.now()
async def add_job(self, job_id: str, image_path: str) -> Job:
await self.cleanup_old_jobs()
job = Job(id=job_id, created_at=datetime.now())
with self.lock:
if len(self.jobs) >= self.max_jobs_in_memory:
self._remove_oldest_completed_jobs(len(self.jobs) - self.max_jobs_in_memory + 1)
self.jobs[job_id] = job
await self.queue.put((job_id, image_path))
return job
def get_job(self, job_id: str) -> Optional[Job]:
return self.jobs.get(job_id)
def get_all_jobs(self) -> List[Job]:
return list(self.jobs.values())
def _remove_oldest_completed_jobs(self, count: int):
completed_jobs = [
(job_id, job) for job_id, job in self.jobs.items()
if job.status in [JobStatus.COMPLETED, JobStatus.FAILED]
]
completed_jobs.sort(key=lambda x: x[1].completed_at or x[1].created_at)
for i in range(min(count, len(completed_jobs))):
del self.jobs[completed_jobs[i][0]]
async def cleanup_old_jobs(self):
current_time = datetime.now()
if (current_time - self.last_cleanup).seconds < self.cleanup_interval:
return
with self.lock:
self.last_cleanup = current_time
cutoff_time = current_time - self.max_job_age
jobs_to_remove = [
job_id for job_id, job in self.jobs.items()
if (job.status in [JobStatus.COMPLETED, JobStatus.FAILED] and job.completed_at and job.completed_at < cutoff_time)
or (job.status == JobStatus.PENDING and job.created_at < cutoff_time)
]
for job_id in jobs_to_remove:
del self.jobs[job_id]
if jobs_to_remove:
gc.collect()
job_queue = JobQueue(max_concurrent_jobs=64)
executor = ThreadPoolExecutor(max_workers=64)
model_lock = threading.Lock()
app = FastAPI(title="NSFW Detection API (OpenNSFW2)", version="5.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def predict_nsfw(image_path: str) -> float:
if not n2:
raise RuntimeError("OpenNSFW2 not available")
with model_lock:
return float(n2.predict_image(image_path))
async def job_processor():
loop = asyncio.get_event_loop()
while True:
try:
job_id, image_path = await job_queue.queue.get()
job = job_queue.get_job(job_id)
if job:
with job_queue.lock:
job.status = JobStatus.PROCESSING
job.started_at = datetime.now()
job_queue.active_jobs += 1
try:
probability = await loop.run_in_executor(executor, predict_nsfw, image_path)
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now()
job.result = {"nsfw_probability": probability}
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
job.completed_at = datetime.now()
finally:
if os.path.exists(image_path):
os.remove(image_path)
with job_queue.lock:
job_queue.active_jobs -= 1
except Exception as e:
print(f"Job processor error: {e}")
await asyncio.sleep(1)
@app.on_event("startup")
async def startup_event():
for _ in range(job_queue.max_concurrent):
asyncio.create_task(job_processor())
print("\n=== NSFW Detection Server (OpenNSFW2) ===")
print(f"Model: Yahoo OpenNSFW2")
print(f"GPU Available: {len(gpus) > 0}")
print(f"Max Concurrent Jobs: {job_queue.max_concurrent}")
print("=========================================\n")
@app.get("/")
async def root():
return {
"status": "online",
"version": "5.0.0",
"model": "OpenNSFW2",
"model_loaded": n2 is not None,
"gpu_available": len(gpus) > 0,
"queue": {
"active": job_queue.active_jobs,
"pending": job_queue.queue.qsize(),
"max_concurrent": job_queue.max_concurrent
}
}
@app.post("/testImageSync")
async def test_image_sync(file: UploadFile = File(...)):
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
temp_path = f"temp_sync_{uuid.uuid4()}_{file.filename}"
try:
contents = await file.read()
with open(temp_path, 'wb') as f:
f.write(contents)
start = time.time()
loop = asyncio.get_event_loop()
probability = await loop.run_in_executor(executor, predict_nsfw, temp_path)
return {"nsfw_probability": probability, "processing_time": time.time() - start}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
@app.post("/testImage")
async def test_image(file: UploadFile = File(...)):
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
job_id = str(uuid.uuid4())
temp_path = f"temp_{job_id}_{file.filename}"
try:
contents = await file.read()
with open(temp_path, 'wb') as f:
f.write(contents)
job = await job_queue.add_job(job_id, temp_path)
return {"job_id": job_id, "status": job.status, "queue_position": job_queue.queue.qsize()}
except Exception as e:
if os.path.exists(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=500, detail=str(e))
@app.get("/job/{job_id}")
async def get_job_status(job_id: str):
job = job_queue.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/testImagesBatch")
async def test_images_batch(files: List[UploadFile] = File(...)):
job_ids = []
for file in files:
if file.content_type and not file.content_type.startswith('image/'):
continue
job_id = str(uuid.uuid4())
temp_path = f"temp_{job_id}_{file.filename}"
try:
contents = await file.read()
with open(temp_path, 'wb') as f:
f.write(contents)
await job_queue.add_job(job_id, temp_path)
job_ids.append(job_id)
except Exception:
if os.path.exists(temp_path):
os.remove(temp_path)
return {"job_ids": job_ids, "count": len(job_ids)}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, workers=1)
"""
OCR Server - EasyOCR-based text extraction API
Provides text extraction from images using EasyOCR with GPU acceleration.
Designed to run alongside NSFW detection on shared GPU infrastructure.
"""
import os
import time
import traceback
import asyncio
from typing import Dict, Any, Optional, Tuple
from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from pydantic import BaseModel
from concurrent.futures import ThreadPoolExecutor
import uuid
from datetime import datetime, timedelta
import threading
import gc
from io import BytesIO
# GPU Configuration - Use GPU 0 by default
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import torch
import easyocr
from PIL import Image
import numpy as np
def setup_gpu():
"""Configure GPU for PyTorch/EasyOCR."""
if torch.cuda.is_available():
device_count = torch.cuda.device_count()
device_name = torch.cuda.get_device_name(0)
print(f"✓ Found {device_count} GPU(s): {device_name}")
print(f"✓ CUDA version: {torch.version.cuda}")
return True
else:
print("✗ No CUDA GPUs found. Running on CPU.")
return False
gpu_available = setup_gpu()
# Initialize EasyOCR reader (singleton)
print("[OCRServer] Loading EasyOCR model...")
reader = easyocr.Reader(
['en'],
gpu=gpu_available,
verbose=False
)
print(f"[OCRServer] EasyOCR ready (GPU={gpu_available})")
class JobStatus:
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class Job(BaseModel):
id: str
status: str = JobStatus.PENDING
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class JobQueue:
def __init__(self, max_concurrent_jobs=32, max_job_age_hours=24, max_jobs_in_memory=500):
self.jobs: Dict[str, Job] = {}
self.queue: asyncio.Queue = asyncio.Queue()
self.max_concurrent = max_concurrent_jobs
self.active_jobs = 0
self.lock = threading.Lock()
self.max_job_age = timedelta(hours=max_job_age_hours)
self.max_jobs_in_memory = max_jobs_in_memory
self.cleanup_interval = 300
self.last_cleanup = datetime.now()
async def add_job(self, job_id: str, image_data: bytes, bbox: Optional[Tuple] = None) -> Job:
await self.cleanup_old_jobs()
job = Job(id=job_id, created_at=datetime.now())
with self.lock:
if len(self.jobs) >= self.max_jobs_in_memory:
self._remove_oldest_completed_jobs(len(self.jobs) - self.max_jobs_in_memory + 1)
self.jobs[job_id] = job
await self.queue.put((job_id, image_data, bbox))
return job
def get_job(self, job_id: str) -> Optional[Job]:
return self.jobs.get(job_id)
def _remove_oldest_completed_jobs(self, count: int):
completed_jobs = [
(job_id, job) for job_id, job in self.jobs.items()
if job.status in [JobStatus.COMPLETED, JobStatus.FAILED]
]
completed_jobs.sort(key=lambda x: x[1].completed_at or x[1].created_at)
for i in range(min(count, len(completed_jobs))):
del self.jobs[completed_jobs[i][0]]
async def cleanup_old_jobs(self):
current_time = datetime.now()
if (current_time - self.last_cleanup).seconds < self.cleanup_interval:
return
with self.lock:
self.last_cleanup = current_time
cutoff_time = current_time - self.max_job_age
jobs_to_remove = [
job_id for job_id, job in self.jobs.items()
if (job.status in [JobStatus.COMPLETED, JobStatus.FAILED] and job.completed_at and job.completed_at < cutoff_time)
or (job.status == JobStatus.PENDING and job.created_at < cutoff_time)
]
for job_id in jobs_to_remove:
del self.jobs[job_id]
if jobs_to_remove:
gc.collect()
job_queue = JobQueue(max_concurrent_jobs=32)
executor = ThreadPoolExecutor(max_workers=8)
model_lock = threading.Lock()
app = FastAPI(title="OCR Detection API (EasyOCR)", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def extract_text(image_data: bytes, bbox: Optional[Tuple[int, int, int, int]] = None, padding: int = 5) -> Dict[str, Any]:
"""Extract text from image, optionally from a specific region."""
with model_lock:
image = Image.open(BytesIO(image_data))
if bbox:
left, top, width, height = bbox
img_width, img_height = image.size
x1 = max(0, left - padding)
y1 = max(0, top - padding)
x2 = min(img_width, left + width + padding)
y2 = min(img_height, top + height + padding)
image = image.crop((x1, y1, x2, y2))
image_np = np.array(image)
results = reader.readtext(image_np)
ocr_results = []
full_text = []
for (box, text, confidence) in results:
ocr_results.append({
"text": text,
"confidence": float(confidence),
"bbox": [int(box[0][0]), int(box[0][1]), int(box[2][0]), int(box[2][1])]
})
full_text.append(text)
return {
"results": ocr_results,
"full_text": " ".join(full_text),
"count": len(ocr_results)
}
async def job_processor():
loop = asyncio.get_event_loop()
while True:
try:
job_id, image_data, bbox = await job_queue.queue.get()
job = job_queue.get_job(job_id)
if job:
with job_queue.lock:
job.status = JobStatus.PROCESSING
job.started_at = datetime.now()
job_queue.active_jobs += 1
try:
result = await loop.run_in_executor(executor, extract_text, image_data, bbox)
job.status = JobStatus.COMPLETED
job.completed_at = datetime.now()
job.result = result
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
job.completed_at = datetime.now()
finally:
with job_queue.lock:
job_queue.active_jobs -= 1
except Exception as e:
print(f"Job processor error: {e}")
await asyncio.sleep(1)
@app.on_event("startup")
async def startup_event():
for _ in range(job_queue.max_concurrent):
asyncio.create_task(job_processor())
print("\n=== OCR Server (EasyOCR) ===")
print("Model: EasyOCR (English)")
print(f"GPU Available: {gpu_available}")
print(f"Max Concurrent Jobs: {job_queue.max_concurrent}")
print("============================\n")
@app.get("/")
async def root():
return {
"status": "online",
"version": "1.0.0",
"model": "EasyOCR",
"gpu_available": gpu_available,
"cuda_version": torch.version.cuda if gpu_available else None,
"queue": {
"active": job_queue.active_jobs,
"pending": job_queue.queue.qsize(),
"max_concurrent": job_queue.max_concurrent
}
}
@app.post("/extractSync")
async def extract_sync(
file: UploadFile = File(...),
left: Optional[int] = Form(None),
top: Optional[int] = Form(None),
width: Optional[int] = Form(None),
height: Optional[int] = Form(None),
padding: int = Form(5)
):
"""Synchronous OCR extraction - returns immediately with results."""
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
try:
contents = await file.read()
bbox = None
if all(v is not None for v in [left, top, width, height]):
bbox = (left, top, width, height)
start = time.time()
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, extract_text, contents, bbox, padding)
result["processing_time"] = time.time() - start
return result
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/extract")
async def extract_async(
file: UploadFile = File(...),
left: Optional[int] = Form(None),
top: Optional[int] = Form(None),
width: Optional[int] = Form(None),
height: Optional[int] = Form(None)
):
"""Async OCR extraction - returns job_id for polling."""
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
job_id = str(uuid.uuid4())
try:
contents = await file.read()
bbox = None
if all(v is not None for v in [left, top, width, height]):
bbox = (left, top, width, height)
job = await job_queue.add_job(job_id, contents, bbox)
return {"job_id": job_id, "status": job.status, "queue_position": job_queue.queue.qsize()}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/job/{job_id}")
async def get_job_status(job_id: str):
job = job_queue.get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/contains")
async def contains_text(
file: UploadFile = File(...),
search_text: str = Form(...),
left: Optional[int] = Form(None),
top: Optional[int] = Form(None),
width: Optional[int] = Form(None),
height: Optional[int] = Form(None),
padding: int = Form(10)
):
"""Check if image region contains specific text."""
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
try:
contents = await file.read()
bbox = None
if all(v is not None for v in [left, top, width, height]):
bbox = (left, top, width, height)
start = time.time()
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, extract_text, contents, bbox, padding)
contains = search_text.lower() in result["full_text"].lower()
return {
"contains": contains,
"search_text": search_text,
"extracted_text": result["full_text"],
"processing_time": time.time() - start
}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
@app.post("/verify")
async def verify_element(
file: UploadFile = File(...),
expected_texts: str = Form(...), # Comma-separated list
left: Optional[int] = Form(None),
top: Optional[int] = Form(None),
width: Optional[int] = Form(None),
height: Optional[int] = Form(None),
padding: int = Form(10)
):
"""Verify which of the expected texts is in the element."""
if file.content_type and not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
try:
contents = await file.read()
bbox = None
if all(v is not None for v in [left, top, width, height]):
bbox = (left, top, width, height)
start = time.time()
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(executor, extract_text, contents, bbox, padding)
extracted_lower = result["full_text"].lower()
expected_list = [t.strip() for t in expected_texts.split(",")]
matched = None
for expected in expected_list:
if expected.lower() in extracted_lower:
matched = expected
break
return {
"matched": matched,
"expected_texts": expected_list,
"extracted_text": result["full_text"],
"processing_time": time.time() - start
}
except Exception as e:
traceback.print_exc()
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001, workers=1)
#!/bin/bash
# =============================================================================
# GPU AI Servers Setup Script for Ubuntu + RTX 4000 Ada
# =============================================================================
# This script sets up:
# - NVIDIA Drivers + CUDA 12.4
# - Python 3.11 with venv
# - TensorFlow (GPU) for NSFW detection
# - PyTorch (GPU) + EasyOCR for OCR detection
# - PM2 for process management
# =============================================================================
set -e # Exit on error
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
echo_status() {
echo -e "${GREEN}[✓]${NC} $1"
}
echo_warning() {
echo -e "${YELLOW}[!]${NC} $1"
}
echo_error() {
echo -e "${RED}[✗]${NC} $1"
}
echo ""
echo "=============================================="
echo " GPU AI Servers Setup (RTX 4000 Ada)"
echo "=============================================="
echo ""
# Check if running as root
if [ "$EUID" -eq 0 ]; then
echo_error "Do not run this script as root. Run as your regular user."
exit 1
fi
# =============================================================================
# Step 1: System Update
# =============================================================================
echo ""
echo ">>> Step 1: System Update"
echo ""
sudo apt update && sudo apt upgrade -y
echo_status "System updated"
# =============================================================================
# Step 2: Install NVIDIA Drivers
# =============================================================================
echo ""
echo ">>> Step 2: Install NVIDIA Drivers"
echo ""
# Check if NVIDIA driver is already installed
if command -v nvidia-smi &> /dev/null; then
echo_status "NVIDIA driver already installed:"
nvidia-smi --query-gpu=driver_version,name --format=csv,noheader
else
echo_warning "Installing NVIDIA drivers..."
# Add NVIDIA PPA for latest drivers
sudo add-apt-repository -y ppa:graphics-drivers/ppa
sudo apt update
# Install recommended driver (usually 545+ for RTX 4000 Ada)
sudo ubuntu-drivers autoinstall
echo_warning "NVIDIA drivers installed. A REBOOT is required!"
echo_warning "After reboot, run this script again to continue setup."
read -p "Press Enter to reboot now, or Ctrl+C to cancel..."
sudo reboot
fi
# Verify NVIDIA driver
nvidia-smi
echo_status "NVIDIA driver verified"
# =============================================================================
# Step 3: Install CUDA Toolkit 12.4
# =============================================================================
echo ""
echo ">>> Step 3: Install CUDA Toolkit 12.4"
echo ""
# Check if CUDA is already installed
if [ -d "/usr/local/cuda-12.4" ]; then
echo_status "CUDA 12.4 already installed"
else
echo_warning "Installing CUDA 12.4..."
# Download and install CUDA 12.4 (network installer)
wget https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64/cuda-keyring_1.1-1_all.deb
sudo dpkg -i cuda-keyring_1.1-1_all.deb
sudo apt update
sudo apt install -y cuda-toolkit-12-4
rm cuda-keyring_1.1-1_all.deb
echo_status "CUDA 12.4 installed"
fi
# Add CUDA to PATH
if ! grep -q "cuda-12.4" ~/.bashrc; then
echo "" >> ~/.bashrc
echo "# CUDA 12.4" >> ~/.bashrc
echo "export PATH=/usr/local/cuda-12.4/bin:\$PATH" >> ~/.bashrc
echo "export LD_LIBRARY_PATH=/usr/local/cuda-12.4/lib64:\$LD_LIBRARY_PATH" >> ~/.bashrc
source ~/.bashrc
echo_status "CUDA paths added to ~/.bashrc"
fi
export PATH=/usr/local/cuda-12.4/bin:$PATH
export LD_LIBRARY_PATH=/usr/local/cuda-12.4/lib64:$LD_LIBRARY_PATH
# Verify CUDA
nvcc --version
echo_status "CUDA verified"
# =============================================================================
# Step 4: Install cuDNN
# =============================================================================
echo ""
echo ">>> Step 4: Install cuDNN"
echo ""
sudo apt install -y libcudnn8 libcudnn8-dev
echo_status "cuDNN installed"
# =============================================================================
# Step 5: Install Python 3.11
# =============================================================================
echo ""
echo ">>> Step 5: Install Python 3.11"
echo ""
sudo apt install -y software-properties-common
sudo add-apt-repository -y ppa:deadsnakes/ppa
sudo apt update
sudo apt install -y python3.11 python3.11-venv python3.11-dev python3-pip
echo_status "Python 3.11 installed"
# =============================================================================
# Step 6: Install Node.js and PM2
# =============================================================================
echo ""
echo ">>> Step 6: Install Node.js 20 and PM2"
echo ""
# Install Node.js 20.x
if ! command -v node &> /dev/null || [[ $(node -v | cut -d. -f1 | tr -d 'v') -lt 20 ]]; then
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt install -y nodejs
fi
node --version
npm --version
# Install PM2 globally
sudo npm install -g pm2
pm2 --version
echo_status "Node.js and PM2 installed"
# =============================================================================
# Step 7: Create project directory and virtual environment
# =============================================================================
echo ""
echo ">>> Step 7: Create project directory"
echo ""
PROJECT_DIR="$HOME/ai-servers"
mkdir -p "$PROJECT_DIR"
cd "$PROJECT_DIR"
# Create Python virtual environment
python3.11 -m venv venv
source venv/bin/activate
echo_status "Virtual environment created at $PROJECT_DIR/venv"
# Upgrade pip
pip install --upgrade pip setuptools wheel
echo_status "pip upgraded"
# =============================================================================
# Step 8: Install Python dependencies
# =============================================================================
echo ""
echo ">>> Step 8: Install Python dependencies"
echo ""
# Install PyTorch with CUDA 12.4 support (for EasyOCR)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
echo_status "PyTorch with CUDA 12.4 installed"
# Install TensorFlow with CUDA support (for NSFW)
pip install tensorflow[and-cuda]
echo_status "TensorFlow with CUDA installed"
# Install other dependencies
pip install \
opennsfw2>=0.10.2 \
easyocr>=1.7.0 \
fastapi>=0.104.1 \
uvicorn[standard]>=0.24.0 \
pillow>=10.1.0 \
numpy>=1.24.3,\<2.0.0 \
pydantic>=2.5.0 \
psutil>=5.9.0 \
python-multipart>=0.0.6
echo_status "All Python dependencies installed"
# =============================================================================
# Step 9: Create requirements.txt
# =============================================================================
echo ""
echo ">>> Step 9: Creating requirements.txt"
echo ""
cat > "$PROJECT_DIR/requirements.txt" << 'EOF'
# GPU Dependencies (install separately with proper CUDA version)
# pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu124
# pip install tensorflow[and-cuda]
# NSFW Detection
opennsfw2>=0.10.2
# OCR Detection
easyocr>=1.7.0
# Web Server
fastapi>=0.104.1
uvicorn[standard]>=0.24.0
# Image Processing
pillow>=10.1.0
numpy>=1.24.3,<2.0.0
# Utilities
pydantic>=2.5.0
psutil>=5.9.0
python-multipart>=0.0.6
EOF
echo_status "requirements.txt created"
# =============================================================================
# Step 10: Verify GPU setup
# =============================================================================
echo ""
echo ">>> Step 10: Verify GPU setup"
echo ""
python3 << 'PYEOF'
import sys
print("=== GPU Verification ===\n")
# Check PyTorch
try:
import torch
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available (PyTorch): {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA version (PyTorch): {torch.version.cuda}")
print(f"GPU device: {torch.cuda.get_device_name(0)}")
print(f"GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
except Exception as e:
print(f"PyTorch error: {e}")
print()
# Check TensorFlow
try:
import tensorflow as tf
print(f"TensorFlow version: {tf.__version__}")
gpus = tf.config.list_physical_devices('GPU')
print(f"CUDA available (TensorFlow): {len(gpus) > 0}")
if gpus:
for gpu in gpus:
print(f"GPU device: {gpu.name}")
except Exception as e:
print(f"TensorFlow error: {e}")
print()
# Check EasyOCR
try:
import easyocr
print(f"EasyOCR: Installed")
except Exception as e:
print(f"EasyOCR error: {e}")
# Check OpenNSFW2
try:
import opennsfw2
print(f"OpenNSFW2: Installed")
except Exception as e:
print(f"OpenNSFW2 error: {e}")
print("\n=== Verification Complete ===")
PYEOF
echo_status "GPU verification complete"
# =============================================================================
# Step 11: Setup complete
# =============================================================================
echo ""
echo "=============================================="
echo " Setup Complete!"
echo "=============================================="
echo ""
echo "Project directory: $PROJECT_DIR"
echo "Virtual environment: $PROJECT_DIR/venv"
echo ""
echo "To fetch server files and start:"
echo ""
echo " cd $PROJECT_DIR"
echo " source venv/bin/activate"
echo ""
echo " # Fetch server files from gist"
echo " curl -sL <GIST_URL>/nsfwServer.py -o nsfwServer.py"
echo " curl -sL <GIST_URL>/ocrServer.py -o ocrServer.py"
echo ""
echo " # Start with PM2"
echo " pm2 start \"venv/bin/python nsfwServer.py\" --name nsfw-server"
echo " pm2 start \"venv/bin/python ocrServer.py\" --name ocr-server"
echo " pm2 save"
echo " pm2 startup"
echo ""
echo "=============================================="
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment