Skip to content

Integration Patterns

This guide covers common patterns for integrating Django Automate with external systems.

Table of Contents

  1. Webhook Integration
  2. LLM Provider Integration
  3. RAG Backend Integration
  4. Connector Development
  5. MCP Server Integration
  6. External API Integration

Webhook Integration

Receiving Webhooks

Django Automate provides a unified webhook endpoint with signature verification.

# urls.py
from automate.views import WebhookView

urlpatterns = [
    path('webhooks/<str:connector_key>/', WebhookView.as_view()),
]

Webhook Flow

sequenceDiagram
    participant External
    participant Webhook as WebhookView
    participant Connector
    participant Outbox

    External->>Webhook: POST /webhooks/stripe/
    Webhook->>Connector: verify_webhook(headers, body)
    Connector-->>Webhook: True
    Webhook->>Outbox: Create OutboxItem
    Webhook-->>External: 202 Accepted + trace_id

Custom Verification

from automate_connectors.base import Connector

class MyWebhookConnector(Connector):
    key = "my_service"

    def verify_webhook(self, headers: dict, raw_body: bytes) -> bool:
        """Custom signature verification."""
        signature = headers.get('X-Signature')
        expected = hmac.new(
            self.secret.encode(),
            raw_body,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(signature, expected)

LLM Provider Integration

Creating a Provider

from automate_llm.providers.base import BaseLLMProvider
from pydantic import BaseModel

class MyLLMConfig(BaseModel):
    api_key: str
    endpoint: str
    model: str = "default"

class MyLLMProvider(BaseLLMProvider):
    """Custom LLM provider."""

    key = "my_llm"
    display_name = "My LLM Service"

    @classmethod
    def config_schema(cls) -> type[BaseModel]:
        return MyLLMConfig

    def __init__(self, config: dict, ctx):
        self.cfg = MyLLMConfig(**config)
        self.ctx = ctx

    def chat(self, messages: list[dict], **kwargs) -> str:
        """Synchronous chat completion."""
        response = requests.post(
            f"{self.cfg.endpoint}/chat",
            headers={"Authorization": f"Bearer {self.cfg.api_key}"},
            json={"messages": messages, "model": self.cfg.model}
        )
        return response.json()["content"]

    async def achat(self, messages: list[dict], **kwargs) -> str:
        """Async chat completion."""
        async with httpx.AsyncClient() as client:
            response = await client.post(...)
            return response.json()["content"]

    def count_tokens(self, text: str) -> int:
        """Estimate token count."""
        return len(text) // 4  # Rough estimate

Registration

# myapp/apps.py
class MyAppConfig(AppConfig):
    def ready(self):
        from automate_llm.registry import register_provider
        from .providers import MyLLMProvider
        register_provider(MyLLMProvider)

RAG Backend Integration

Custom Retrieval Provider

from rag.providers.base import BaseRetrievalProvider

class PineconeProvider(BaseRetrievalProvider):
    """Pinecone vector database integration."""

    key = "pinecone"

    def __init__(self, config: dict):
        self.index = pinecone.Index(
            config["index_name"],
            api_key=config["api_key"]
        )

    def retrieve(
        self, 
        query_embedding: list[float], 
        top_k: int = 5,
        filters: dict = None
    ) -> list[dict]:
        """Retrieve similar documents."""
        results = self.index.query(
            vector=query_embedding,
            top_k=top_k,
            filter=filters,
            include_metadata=True
        )
        return [
            {
                "id": match.id,
                "score": match.score,
                "content": match.metadata.get("content"),
                "metadata": match.metadata,
            }
            for match in results.matches
        ]

    def index_documents(self, documents: list[dict]) -> int:
        """Index documents into vector store."""
        vectors = [
            {
                "id": doc["id"],
                "values": doc["embedding"],
                "metadata": doc["metadata"],
            }
            for doc in documents
        ]
        self.index.upsert(vectors)
        return len(vectors)

Custom Embedding Provider

from rag.embeddings.base import BaseEmbeddingProvider

class CohereEmbeddingProvider(BaseEmbeddingProvider):
    """Cohere embeddings integration."""

    key = "cohere"
    dimensions = 1024

    def embed(self, texts: list[str]) -> list[list[float]]:
        response = cohere.Client(self.api_key).embed(
            texts=texts,
            model="embed-english-v3.0"
        )
        return response.embeddings

Connector Development

Full Connector Example

from automate_connectors.base import (
    Connector, ActionSpec, TriggerSpec, CapabilitySpec
)
from automate_core.providers.base import ProviderContext
from automate_core.providers.errors import AutomateError, ErrorCodes
from pydantic import BaseModel

class JiraConfig(BaseModel):
    url: str
    email: str
    api_token: str
    project_key: str = "PROJ"

class JiraConnector(Connector):
    """Jira integration connector."""

    key = "jira"
    display_name = "Jira"

    def __init__(self, config: dict, *, ctx: ProviderContext):
        self.cfg = JiraConfig(**config)
        self.ctx = ctx
        self.client = self._create_client()

    @classmethod
    def config_schema(cls) -> type[BaseModel]:
        return JiraConfig

    @classmethod
    def capabilities(cls) -> list[CapabilitySpec]:
        return [
            CapabilitySpec(name="connector", modalities={"action", "trigger"})
        ]

    @classmethod
    def actions(cls) -> list[ActionSpec]:
        return [
            ActionSpec(
                name="create_issue",
                input_schema={
                    "type": "object",
                    "properties": {
                        "summary": {"type": "string"},
                        "description": {"type": "string"},
                        "issue_type": {"type": "string", "default": "Task"},
                    },
                    "required": ["summary"],
                },
                idempotent=False,
            ),
            ActionSpec(
                name="add_comment",
                input_schema={
                    "type": "object",
                    "properties": {
                        "issue_key": {"type": "string"},
                        "body": {"type": "string"},
                    },
                    "required": ["issue_key", "body"],
                },
                idempotent=True,
            ),
        ]

    @classmethod
    def triggers(cls) -> list[TriggerSpec]:
        return [
            TriggerSpec(name="issue_created", verification_method="hmac"),
            TriggerSpec(name="issue_updated", verification_method="hmac"),
        ]

    def execute_action(self, action: str, input_data: dict) -> dict:
        if action == "create_issue":
            return self._create_issue(input_data)
        elif action == "add_comment":
            return self._add_comment(input_data)
        raise AutomateError(ErrorCodes.INVALID_ARGUMENT, f"Unknown: {action}")

    def _create_issue(self, data: dict) -> dict:
        response = self.client.post("/rest/api/3/issue", json={
            "fields": {
                "project": {"key": self.cfg.project_key},
                "summary": data["summary"],
                "description": data.get("description"),
                "issuetype": {"name": data.get("issue_type", "Task")},
            }
        })
        return {"key": response.json()["key"], "id": response.json()["id"]}

    def normalize_error(self, exc: Exception) -> Exception:
        return AutomateError(ErrorCodes.INTERNAL, str(exc), provider=self.key)

MCP Server Integration

Registering MCP Server

from automate.models import MCPServer

# Via Admin UI or programmatically
server = MCPServer.objects.create(
    name="My Tools",
    url="http://localhost:3000/mcp",
    api_key="secret",
    enabled=True,
)

# Sync available tools
server.sync_tools()

Using MCP Tools in Workflows

from automate.services import WorkflowService

service = WorkflowService()
workflow = service.create_workflow(
    name="MCP Workflow",
    steps=[
        {
            "name": "Use MCP Tool",
            "type": "mcp_tool",
            "config": {
                "server": "my_tools",
                "tool": "search_documents",
                "args": {"query": "{{ event.payload.query }}"}
            }
        }
    ]
)

External API Integration

SSRF-Safe Requests

Always use the SSRF-safe client for external requests:

from rag.security.ssrf_client import ssrf_safe_request, SSRFError

try:
    response = ssrf_safe_request(
        "GET",
        url="https://api.example.com/data",
        credentials_ref="secret://my-api-key",
        timeout=30,
        max_size=10 * 1024 * 1024,  # 10MB
    )
except SSRFError as e:
    # Blocked: private IP, redirect attack, etc.
    logger.warning(f"SSRF protection blocked request: {e}")

Rate Limiting External Calls

from automate_core.throttling import RateLimiter

limiter = RateLimiter(key="external_api", rate="100/min")

if limiter.allow():
    response = ssrf_safe_request("GET", url)
else:
    raise RateLimitExceeded("External API rate limit")

Testing Integrations

Contract Tests

from automate_testing.contracts import ConnectorContractTest

class TestJiraConnector(ConnectorContractTest):
    connector_class = JiraConnector

    def get_config(self):
        return {
            "url": "https://test.atlassian.net",
            "email": "test@example.com",
            "api_token": "test-token",
        }

    def test_create_issue_success(self):
        with self.mock_http() as mock:
            mock.post("/rest/api/3/issue", json={"key": "TEST-1"})

            result = self.connector.execute_action("create_issue", {
                "summary": "Test Issue"
            })

            assert result["key"] == "TEST-1"

Integration Test Fixtures

import pytest
from automate_testing.fixtures import connector_factory

@pytest.fixture
def jira_connector(connector_factory):
    return connector_factory(
        JiraConnector,
        config={"url": "...", "email": "...", "api_token": "..."}
    )

def test_full_flow(jira_connector):
    result = jira_connector.execute_action("create_issue", {...})
    assert "key" in result

See Also