Skip to main content
The InProcessTaskApp utility enables running task apps entirely within your Python script, eliminating the need for separate terminal processes or manual tunnel management. It automatically starts a FastAPI server, opens a Cloudflare tunnel, and provides the tunnel URL for GEPA/MIPRO optimization jobs.

Quick Start

from synth_ai.task import InProcessTaskApp
from your_task_app import build_config

async def main():
    async with InProcessTaskApp(
        config_factory=build_config,
        port=8114,
    ) as task_app:
        print(f"Task app running at: {task_app.url}")
        # Use task_app.url for your optimization jobs
        # The tunnel and server will automatically clean up on exit

asyncio.run(main())

Features

Automatic Server Management

Starts uvicorn server in background thread

Automatic Tunnel Creation

Opens Cloudflare quick tunnel automatically

Port Conflict Handling

Automatically finds available ports if requested port is busy

Signal Handling

Graceful shutdown on SIGINT/SIGTERM

Observability

Structured logging and optional callbacks

Multiple Input Methods

Supports app, config, config_factory, or file path

API Reference

InProcessTaskApp

Context manager for running task apps in-process with automatic Cloudflare tunnels.

Parameters

ParameterTypeDefaultDescription
appOptional[ASGIApplication]NoneFastAPI app instance (most direct)
configOptional[TaskAppConfig]NoneTaskAppConfig object
config_factoryOptional[Callable[[], TaskAppConfig]]NoneCallable that returns TaskAppConfig
task_app_pathOptional[Path | str]NonePath to task app .py file (fallback)
portint8114Local port to run server on (must be in range [1024, 65535])
hoststr"127.0.0.1"Host to bind to (must be localhost for security)
tunnel_modestr"quick"Tunnel mode (“quick” for ephemeral tunnels)
api_keyOptional[str]NoneAPI key for health checks (defaults to ENVIRONMENT_API_KEY env var)
health_check_timeoutfloat30.0Max time to wait for health check in seconds
auto_find_portboolTrueAutomatically find available port if requested port is busy
on_startOptional[Callable[[InProcessTaskApp], None]]NoneCallback called when task app starts
on_stopOptional[Callable[[InProcessTaskApp], None]]NoneCallback called when task app stops

Attributes

  • url (Optional[str]): The Cloudflare tunnel URL (available after __aenter__)
  • port (int): The actual port the server is running on (may differ from requested if auto_find_port=True)
  • host (str): The host the server is bound to
  • tunnel_mode (str): The tunnel mode being used

Raises

  • ValueError: If multiple or no input methods provided, or invalid parameters
  • FileNotFoundError: If task_app_path doesn’t exist
  • RuntimeError: If health check fails or port conflicts can’t be resolved

Usage Examples

from synth_ai.task import InProcessTaskApp
from heartdisease_task_app import build_config

async def main():
    async with InProcessTaskApp(
        config_factory=build_config,
        port=8114,
    ) as task_app:
        print(f"Task app URL: {task_app.url}")
        # Your optimization code here

asyncio.run(main())

Using Task App File Path

from synth_ai.task import InProcessTaskApp

async def main():
    async with InProcessTaskApp(
        task_app_path="examples/task_apps/banking77/banking77_task_app.py",
        port=8114,
    ) as task_app:
        print(f"Task app URL: {task_app.url}")
        # Your optimization code here

asyncio.run(main())

With Callbacks and Custom Port

from synth_ai.task import InProcessTaskApp

def on_start(task_app):
    print(f"✅ Task app started on port {task_app.port}")
    print(f"🌐 Tunnel URL: {task_app.url}")

def on_stop(task_app):
    print(f"🛑 Task app stopped")

async def main():
    async with InProcessTaskApp(
        config_factory=build_config,
        port=9000,
        auto_find_port=True,  # Will find available port if 9000 is busy
        on_start=on_start,
        on_stop=on_stop,
    ) as task_app:
        # Your optimization code here
        pass

asyncio.run(main())

Full GEPA Integration

from synth_ai.task import InProcessTaskApp
from synth_ai.api.train.prompt_learning import PromptLearningJob
from heartdisease_task_app import build_config

async def main():
    # Start task app in-process
    async with InProcessTaskApp(
        config_factory=build_config,
        port=8114,
    ) as task_app:
        print(f"Task app running at: {task_app.url}")
        
        # Create GEPA job with task app URL
        job = PromptLearningJob(
            backend_url="http://localhost:8000",
            api_key="your-api-key",
        )
        
        # Submit job
        job_id = await job.submit(
            config_path="configs/heartdisease_gepa.toml",
            task_app_url=task_app.url,
        )
        
        # Poll until complete
        results = await job.poll_until_complete(job_id)
        print(f"Best score: {results.best_score}")

asyncio.run(main())

Port Conflict Handling

The utility automatically handles port conflicts:
  • auto_find_port=True (default): If requested port is busy, automatically finds next available port
  • auto_find_port=False: Attempts to kill process on port, then raises error if still busy
# Will automatically find available port if 8114 is busy
async with InProcessTaskApp(
    config_factory=build_config,
    port=8114,
    auto_find_port=True,  # Default
) as task_app:
    print(f"Running on port {task_app.port}")  # May differ from 8114

Input Validation

The utility validates all inputs with clear error messages:
  • Port: Must be in range [1024, 65535]
  • Host: Must be 127.0.0.1, localhost, or 0.0.0.0 (security requirement)
  • Tunnel Mode: Currently only "quick" is supported
  • Task App Path: Must exist and be a .py file
  • Input Methods: Exactly one of app, config, config_factory, or task_app_path must be provided

Logging

The utility uses Python’s logging module:
import logging

# Configure logging level
logging.basicConfig(level=logging.INFO)

# Now you'll see INFO logs for lifecycle events
async with InProcessTaskApp(...) as task_app:
    pass
Log levels:
  • INFO: Major lifecycle events (start, stop, tunnel URL)
  • DEBUG: Detailed operations (port checks, health checks)
  • WARNING: Port conflicts, callback exceptions

Signal Handling

The utility automatically handles SIGINT/SIGTERM signals for graceful shutdown:
  • All active instances are cleaned up on signal
  • Prevents orphaned processes
  • Works seamlessly with context manager cleanup

Requirements

  • Python >= 3.11
  • cloudflared binary (will auto-install if missing via ensure_cloudflared_installed())
  • Task app must expose /health endpoint
  • Task app must accept X-API-Key header for authentication

Troubleshooting

Port Already in Use

If you see “address already in use” errors:
# Option 1: Use auto_find_port (recommended)
async with InProcessTaskApp(..., auto_find_port=True) as task_app:
    pass

# Option 2: Manually kill process on port
# On macOS/Linux:
# lsof -ti:8114 | xargs kill -9

Health Check Timeout

If health check times out:
  1. Verify task app has /health endpoint
  2. Verify task app accepts X-API-Key header
  3. Increase timeout: health_check_timeout=60.0

Tunnel Not Opening

If tunnel fails to open:
  1. Verify cloudflared is installed: which cloudflared
  2. Check network connectivity
  3. Review logs for detailed error messages