Python API Reference
Complete reference for the Django Automate Python API.
Table of Contents
- Runtime Module
- Events Module
- Rules Module
- Secrets Module
- Connectors Module
- LLM Module
- RAG Module
- Outbox Module
Runtime Module
Import: from automate.runtime import Runtime
Runtime Class
The core execution engine for running workflows.
from automate.runtime import Runtime
runtime = Runtime()
runtime.run_execution(execution_id="550e8400-e29b-41d4-a716-446655440000")
run_execution(execution_id: str) -> None
Execute a workflow by execution ID.
Parameters:
- execution_id: UUID of the execution to run
Behavior: - Loads the execution and associated workflow - Executes each step in the workflow graph - Handles retries with exponential backoff - Updates execution status (RUNNING → SUCCESS/FAILED)
Raises:
- Execution.DoesNotExist: If execution not found
Example:
from automate.runtime import Runtime
from automate.models import Execution
# Create an execution
execution = Execution.objects.create(
automation=automation,
context={"order_id": "ORD-123"}
)
# Run it
runtime = Runtime()
runtime.run_execution(execution.id)
# Check result
execution.refresh_from_db()
print(f"Status: {execution.status}") # SUCCESS or FAILED
Events Module
Import: from automate.events import emit_event, run_workflow
emit_event(type: str, payload: dict, dedupe_key: str = None) -> str
Ingest an event into the outbox for processing.
Parameters:
- type: Event type identifier (e.g., "order.created")
- payload: Event data dictionary
- dedupe_key: Optional deduplication key (prevents duplicate processing)
Returns: Event UUID string
Example:
from automate.events import emit_event
# Simple event
event_id = emit_event(
type="order.created",
payload={
"order_id": "ORD-123",
"amount": 99.99,
"customer_email": "john@example.com"
}
)
print(f"Event {event_id} queued for processing")
# With deduplication (idempotent)
event_id = emit_event(
type="payment.received",
payload={"payment_id": "PAY-456"},
dedupe_key="payment-PAY-456" # Won't create duplicate if called again
)
run_workflow(workflow_slug: str, payload: dict) -> str
Directly execute a workflow, bypassing the rule engine.
Parameters:
- workflow_slug: Slug identifier of the workflow
- payload: Input data for the workflow
Returns: Execution UUID string
Example:
from automate.events import run_workflow
# Run workflow directly
execution_id = run_workflow(
workflow_slug="send-welcome-email",
payload={
"user_id": "USR-123",
"email": "newuser@example.com",
"name": "John Doe"
}
)
print(f"Started execution: {execution_id}")
Rules Module
Import: from automate.rules import evaluate, compile_rule
evaluate(rule_spec: dict, context: dict) -> bool
Evaluate a JSONLogic rule against a context.
Parameters:
- rule_spec: JSONLogic rule specification
- context: Data context for evaluation
Returns: Boolean result of rule evaluation
Example:
from automate.rules import evaluate
# Simple comparison
result = evaluate(
rule_spec={">=": [{"var": "amount"}, 100]},
context={"amount": 150}
)
print(result) # True
# Complex rule with AND/OR
rule = {
"and": [
{"==": [{"var": "status"}, "active"]},
{"or": [
{">": [{"var": "amount"}, 1000]},
{"==": [{"var": "priority"}, "high"]}
]}
]
}
context = {"status": "active", "amount": 500, "priority": "high"}
result = evaluate(rule, context)
print(result) # True
compile_rule(rule_spec: dict) -> CompiledRule
Pre-compile a rule for repeated evaluation.
Parameters:
- rule_spec: JSONLogic rule specification
Returns: CompiledRule object with evaluate(context) method
Example:
from automate.rules import compile_rule
# Compile once
rule = compile_rule({">=": [{"var": "score"}, 80]})
# Evaluate many times
for student in students:
if rule.evaluate({"score": student.score}):
print(f"{student.name} passed!")
Supported Operations
| Operation | Example | Description |
|---|---|---|
== |
{"==": [{"var": "a"}, 1]} |
Equality |
!= |
{"!=": [{"var": "a"}, 1]} |
Inequality |
>, >=, <, <= |
{">": [{"var": "a"}, 10]} |
Comparison |
and |
{"and": [...]} |
Logical AND |
or |
{"or": [...]} |
Logical OR |
! |
{"!": {...}} |
Logical NOT |
in |
{"in": ["a", {"var": "list"}]} |
Contains |
var |
{"var": "path.to.value"} |
Variable access |
if |
{"if": [cond, then, else]} |
Conditional |
Secrets Module
Import: from automate.secrets import resolve, SecretRef
resolve(ref: str) -> str
Resolve a secret reference to its value.
Parameters:
- ref: Secret reference string (e.g., $secret:API_KEY)
Returns: Decrypted secret value
Raises:
- SecretNotFoundError: If secret doesn't exist
- SecretAccessDenied: If access is not permitted
Example:
from automate.secrets import resolve
# Resolve a secret
api_key = resolve("$secret:STRIPE_API_KEY")
# Use in code
import stripe
stripe.api_key = api_key
SecretRef Class
Type-safe secret reference for configuration.
from automate.secrets import SecretRef
class MyConfig:
api_key: SecretRef = SecretRef("$secret:API_KEY")
# In Pydantic models
from pydantic import BaseModel
class ConnectorConfig(BaseModel):
api_key: SecretRef
class Config:
arbitrary_types_allowed = True
Secret Reference Formats
| Format | Example | Description |
|---|---|---|
$secret:KEY |
$secret:API_KEY |
Database-stored secret |
$env:VAR |
$env:DATABASE_URL |
Environment variable |
$vault:path |
$vault:secret/api |
HashiCorp Vault |
Connectors Module
Import: from automate_connectors import Connector, registry
Connector Base Class
from automate_connectors.base import Connector, ActionSpec, TriggerSpec
class MyConnector(Connector):
key = "my_connector"
display_name = "My Connector"
@classmethod
def actions(cls) -> list[ActionSpec]:
return [
ActionSpec(
name="send_message",
input_schema={"type": "object", "properties": {...}},
idempotent=True
)
]
def execute_action(self, action: str, input_data: dict) -> dict:
if action == "send_message":
return self._send_message(input_data)
raise ValueError(f"Unknown action: {action}")
def normalize_error(self, exc: Exception) -> Exception:
return AutomateError(ErrorCodes.INTERNAL, str(exc))
Registry
from automate_connectors.registry import registry
# Get a connector
connector_cls = registry.get_connector("slack")
connector = connector_cls(config={...}, ctx=context)
# List all connectors
for key in registry.list_connectors():
print(key)
# Register a connector
registry.register("my_connector", MyConnector)
LLM Module
Import: from automate_llm import chat, count_tokens, get_provider
chat(messages: list[dict], **kwargs) -> str
Send a chat completion request to the configured LLM.
Parameters:
- messages: List of message dictionaries with role and content
- **kwargs: Provider-specific options (model, temperature, etc.)
Returns: Generated text response
Example:
from automate_llm import chat
response = chat([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"}
])
print(response) # "The capital of France is Paris."
# With options
response = chat(
messages=[{"role": "user", "content": "Write a haiku."}],
model="gpt-4",
temperature=0.9,
max_tokens=100
)
count_tokens(text: str, model: str = None) -> int
Estimate token count for text.
Parameters:
- text: Input text
- model: Optional model name for accurate counting
Returns: Estimated token count
Example:
from automate_llm import count_tokens
tokens = count_tokens("Hello, world!")
print(f"Text uses approximately {tokens} tokens")
get_provider(name: str) -> LLMProvider
Get an LLM provider instance.
Parameters:
- name: Provider name (openai, anthropic, etc.)
Returns: LLMProvider instance
Example:
from automate_llm import get_provider
provider = get_provider("openai")
response = provider.chat([
{"role": "user", "content": "Hello!"}
])
RAG Module
Import: from rag import query, index_documents
query(endpoint: str, query_text: str, **kwargs) -> list[dict]
Execute a RAG query.
Parameters:
- endpoint: RAG endpoint slug
- query_text: Natural language query
- top_k: Number of results (default: 5)
- filters: Optional metadata filters
Returns: List of matching documents with scores
Example:
from rag import query
results = query(
endpoint="support-docs",
query_text="How do I reset my password?",
top_k=3,
filters={"category": "authentication"}
)
for doc in results:
print(f"Score: {doc['score']:.2f}")
print(f"Content: {doc['content'][:100]}...")
index_documents(endpoint: str, documents: list[dict]) -> int
Index documents into a RAG endpoint.
Parameters:
- endpoint: RAG endpoint slug
- documents: List of document dictionaries
Returns: Number of documents indexed
Example:
from rag import index_documents
count = index_documents(
endpoint="support-docs",
documents=[
{
"id": "doc-001",
"content": "To reset your password, go to Settings...",
"metadata": {"category": "authentication", "author": "admin"}
},
{
"id": "doc-002",
"content": "Two-factor authentication adds an extra layer...",
"metadata": {"category": "security"}
}
]
)
print(f"Indexed {count} documents")
Outbox Module
Import: from automate_core.outbox import OutboxStore, OutboxReaper
OutboxStore
Claim and process outbox items.
from automate_core.outbox.store import SkipLockedClaimOutboxStore
store = SkipLockedClaimOutboxStore(lease_seconds=300)
# Claim items
items = store.claim_batch("worker-1", limit=10)
for item in items:
try:
process(item)
store.mark_success(item.id, "worker-1")
except TransientError as e:
store.mark_retry(item.id, "worker-1", next_attempt, str(e))
except PermanentError as e:
store.mark_failed(item.id, "worker-1", str(e))
OutboxReaper
Recover stuck items.
from automate_core.outbox.reaper import OutboxReaper
reaper = OutboxReaper(stale_threshold_seconds=600)
reaped = reaper.reap_stale_items()
print(f"Recovered {reaped} stuck items")
See Outbox Pattern Reference for detailed documentation.
Type Annotations
All public APIs include full type annotations:
from typing import Any
def emit_event(
type: str,
payload: dict[str, Any],
dedupe_key: str | None = None
) -> str: ...
def evaluate(
rule_spec: dict[str, Any],
context: dict[str, Any]
) -> bool: ...
Async Support
Most APIs have async variants:
from automate_llm import achat
response = await achat([
{"role": "user", "content": "Hello!"}
])