Skip to main content

Synchronous Requests

Learn how to use the synchronous endpoint for real-time computer vision processing.

Overview

The sync endpoint (/v1/cv/requests/sync) provides immediate results, perfect for interactive applications where users need instant feedback.

When to Use Sync

Use sync for:

  • Real-time user-facing features
  • Interactive applications
  • Single image processing
  • Immediate feedback requirements
  • Live camera feeds

Don't use sync for:

  • Very high-volume batch processing (consider rate limiting)
  • Operations where immediate results aren't needed (though sync is the only option currently)

Request Format

POST /v1/cv/requests/sync

Request:

{
"external_id": "live-scan-001",
"image_url": "https://example.com/product.jpg"
}

Success Response:

{
"idempotency_id": "cv-req-xyz789",
"external_id": "live-scan-001",
"status": "DONE",
"results": [
{
"class_name": "Product",
"model_name": "product-detector-v2",
"confidence": 0.95,
"bounding_box": {
"x": 100,
"y": 150,
"width": 200,
"height": 250
}
}
],
"processing_time_ms": 1850
}

Request Parameters

ParameterTypeRequiredDescription
external_idstringYesYour unique identifier
image_urlstring (URL)YesPublicly accessible image URL
note

The sync endpoint does not support callback_url since results are returned directly.

Timeout Handling

The sync endpoint has a 29-second timeout (AWS API Gateway limit). If processing takes longer:

Timeout Response (408):

{
"error": "Request Timeout",
"message": "Processing exceeded 29 seconds",
"external_id": "live-scan-001"
}

Handling Timeouts

import requests

def process_with_fallback(image_url, external_id):
try:
# Try sync first
response = requests.post(
sync_url,
headers=headers,
json={
"external_id": external_id,
"image_url": image_url
},
timeout=30
)

if response.status_code == 408:
# Timeout - retry with exponential backoff
print("Sync timeout, retrying...")
time.sleep(2 ** attempt)
continue

return response.json()

except requests.exceptions.Timeout:
# Network timeout - retry
time.sleep(2 ** attempt)
continue

Error Responses

400 Bad Request

Missing or invalid parameters:

{
"error": "Bad Request",
"message": "Missing required field: image_url"
}

403 Forbidden

No classes mapped to your API key:

{
"error": "Forbidden",
"message": "No classes mapped to this API key"
}

408 Request Timeout

Processing exceeded 29 seconds:

{
"error": "Request Timeout",
"message": "Processing exceeded 29 seconds"
}

429 Too Many Requests

Rate limit exceeded (10 requests/minute):

{
"error": "Too Many Requests",
"message": "Rate limit exceeded for sync endpoint"
}

500 Internal Server Error

Processing error:

{
"error": "Internal Server Error",
"message": "Failed to process image",
"details": "Model inference failed"
}

Code Examples

Python with Error Handling

import requests
import os
from typing import Dict, Optional

API_KEY = os.environ.get('VFROG_API_KEY')
SYNC_URL = "https://api.vfrog.ai/v1/cv/requests/sync"

def process_image_sync(image_url: str, external_id: str) -> Optional[Dict]:
"""
Process an image synchronously with comprehensive error handling.
"""
headers = {
"x-api-key": API_KEY,
"Content-Type": "application/json"
}

payload = {
"external_id": external_id,
"image_url": image_url
}

try:
response = requests.post(
SYNC_URL,
headers=headers,
json=payload,
timeout=30
)

if response.status_code == 200:
return response.json()
elif response.status_code == 408:
print(f"Timeout for {external_id}")
return None
elif response.status_code == 429:
print("Rate limit exceeded, waiting...")
time.sleep(60)
return process_image_sync(image_url, external_id)
else:
print(f"Error {response.status_code}: {response.text}")
return None

except requests.exceptions.Timeout:
print(f"Network timeout for {external_id}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None

# Usage
result = process_image_sync(
"https://example.com/product.jpg",
"scan-001"
)

if result and result['status'] == 'DONE':
for detection in result['results']:
print(f"Found {detection['class_name']} "
f"with {detection['confidence']:.2%} confidence")

JavaScript/Node.js with Retry

const axios = require('axios')

const API_KEY = process.env.VFROG_API_KEY
const SYNC_URL = 'https://api.vfrog.ai/v1/cv/requests/sync'

async function processImageSync(imageUrl, externalId, retries = 3) {
for (let attempt = 1; attempt <= retries; attempt++) {
try {
const response = await axios.post(
SYNC_URL,
{
external_id: externalId,
image_url: imageUrl,
},
{
headers: {
'x-api-key': API_KEY,
'Content-Type': 'application/json',
},
timeout: 30000,
}
)

return response.data
} catch (error) {
if (error.response?.status === 408) {
console.log(`Timeout on attempt ${attempt}`)
if (attempt === retries) {
throw new Error('Max retries exceeded due to timeouts')
}
} else if (error.response?.status === 429) {
console.log('Rate limited, waiting 60 seconds...')
await new Promise((resolve) => setTimeout(resolve, 60000))
} else {
throw error
}
}
}
}

// Usage
processImageSync('https://example.com/product.jpg', 'scan-001')
.then((result) => {
if (result.status === 'DONE') {
result.results.forEach((detection) => {
console.log(
`Found ${detection.class_name} ` +
`with ${(detection.confidence * 100).toFixed(1)}% confidence`
)
})
}
})
.catch((error) => {
console.error('Processing failed:', error.message)
})

React Hook

import { useState } from 'react'
import axios from 'axios'

interface CVResult {
idempotency_id: string
external_id: string
status: string
results: Array<{
class_name: string
confidence: number
bounding_box: {
x: number
y: number
width: number
height: number
}
}>
}

export function useVfrogSync() {
const [loading, setLoading] = useState(false)
const [error, setError] = useState<string | null>(null)
const [result, setResult] = useState<CVResult | null>(null)

const processImage = async (imageUrl: string, externalId: string) => {
setLoading(true)
setError(null)

try {
const response = await axios.post(
process.env.REACT_APP_VFROG_SYNC_URL!,
{
external_id: externalId,
image_url: imageUrl,
},
{
headers: {
'x-api-key': process.env.REACT_APP_VFROG_API_KEY!,
'Content-Type': 'application/json',
},
timeout: 30000,
}
)

setResult(response.data)
return response.data
} catch (err: any) {
const errorMsg = err.response?.data?.message || err.message
setError(errorMsg)
throw err
} finally {
setLoading(false)
}
}

return { processImage, loading, error, result }
}

// Usage in component
function ProductScanner() {
const { processImage, loading, result } = useVfrogSync()

const handleScan = async (imageUrl: string) => {
await processImage(imageUrl, `scan-${Date.now()}`)
}

return (
<div>
{loading && <p>Processing...</p>}
{result && (
<div>
<h3>Results:</h3>
{result.results.map((item, i) => (
<div key={i}>
{item.class_name}: {(item.confidence * 100).toFixed(1)}%
</div>
))}
</div>
)}
</div>
)
}

Performance Optimization

1. Image Optimization

Smaller images process faster:

from PIL import Image
import io
import requests

def optimize_image(image_url, max_size=1024):
# Download image
response = requests.get(image_url)
img = Image.open(io.BytesIO(response.content))

# Resize if too large
if max(img.size) > max_size:
img.thumbnail((max_size, max_size), Image.LANCZOS)

# Convert to JPEG for smaller size
buffer = io.BytesIO()
img.convert('RGB').save(buffer, format='JPEG', quality=85)

# Upload optimized image and return URL
optimized_url = upload_to_cdn(buffer.getvalue())
return optimized_url

2. Caching Results

Cache results for frequently scanned images:

import hashlib
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_cached_result(image_url_hash):
# Check cache first
cached = redis_client.get(f"cv:{image_url_hash}")
if cached:
return json.loads(cached)
return None

def process_with_cache(image_url, external_id):
# Generate hash of image URL
url_hash = hashlib.md5(image_url.encode()).hexdigest()

# Check cache
cached_result = get_cached_result(url_hash)
if cached_result:
return cached_result

# Process image
result = process_image_sync(image_url, external_id)

# Cache result for 1 hour
if result:
redis_client.setex(
f"cv:{url_hash}",
3600,
json.dumps(result)
)

return result

Best Practices

  1. Implement timeouts: Always set client-side timeouts slightly above 29 seconds
  2. Handle rate limits: Implement exponential backoff for 429 errors
  3. Optimize images: Smaller images = faster processing
  4. Cache results: Don't reprocess the same image repeatedly
  5. Handle timeouts gracefully: If sync times out, retry with exponential backoff
  6. Monitor performance: Track processing times and success rates

Next Steps