Skip to main content
ArzuleRun provides fine-grained control over trace collection. Use it when you need to:
  • Write traces to a specific destination
  • Access the run ID during execution
  • Control run boundaries explicitly

Basic usage

from arzule_ingest import ArzuleRun
from arzule_ingest.sinks import JsonlFileSink
from arzule_ingest.crewai import instrument_crewai

# Instrument your framework (call once at startup)
instrument_crewai()

# Create a sink
sink = JsonlFileSink("traces/output.jsonl")

# Run your agent inside the context
with ArzuleRun(
    tenant_id="your-tenant-id",
    project_id="your-project-id",
    sink=sink
) as run:
    result = crew.kickoff()
    print(f"Run completed: {run.run_id}")

Constructor parameters

ArzuleRun(
    tenant_id: str,      # Required: your tenant identifier
    project_id: str,     # Required: project to associate traces with
    sink: BaseSink,      # Required: where to send events
    run_id: str = None,  # Optional: custom run ID (auto-generated if omitted)
    metadata: dict = None # Optional: custom metadata attached to the run
)

Properties

PropertyTypeDescription
run_idstrUUID identifying this run
tenant_idstrTenant identifier
project_idstrProject identifier
started_atdatetimeWhen the run began
event_countintNumber of events captured

Methods

emit(event: TraceEvent)

Manually emit a trace event:
from arzule_ingest import ArzuleRun, TraceEvent

with ArzuleRun(...) as run:
    run.emit(TraceEvent(
        event_type="custom.event",
        summary="Something happened",
        attrs_compact={"key": "value"}
    ))

flush()

Force-flush any buffered events:
with ArzuleRun(...) as run:
    crew.kickoff()
    run.flush()  # Ensure all events are written

Custom run IDs

Provide your own run ID for correlation with external systems:
import uuid

custom_id = str(uuid.uuid4())

with ArzuleRun(
    tenant_id="...",
    project_id="...",
    sink=sink,
    run_id=custom_id
) as run:
    # run.run_id == custom_id
    pass

Run metadata

Attach custom metadata to runs:
with ArzuleRun(
    tenant_id="...",
    project_id="...",
    sink=sink,
    metadata={
        "environment": "production",
        "version": "1.2.3",
        "triggered_by": "cron"
    }
) as run:
    pass
Metadata appears in the dashboard and can be used for filtering.

Nested runs

Runs cannot be nested. Starting a new ArzuleRun while one is active will raise an error:
with ArzuleRun(...) as outer:
    with ArzuleRun(...) as inner:  # Raises RuntimeError
        pass
If you need to track sub-executions, use spans within a single run.

Error handling

If an exception occurs inside the run, it’s recorded and re-raised:
try:
    with ArzuleRun(...) as run:
        raise ValueError("Something went wrong")
except ValueError:
    pass  # Exception is re-raised after run.end event is emitted
The run’s final status will be error with the exception details captured.

Async support

For async code, use async with:
async with ArzuleRun(...) as run:
    result = await async_crew.kickoff()

Example: Multiple runs

Process multiple inputs as separate runs:
from arzule_ingest import ArzuleRun
from arzule_ingest.sinks import HttpBatchSink

sink = HttpBatchSink(
    endpoint_url="https://ingest.arzule.com",
    api_key="..."
)

topics = ["AI safety", "Climate tech", "Quantum computing"]

for topic in topics:
    with ArzuleRun(
        tenant_id="...",
        project_id="...",
        sink=sink,
        metadata={"topic": topic}
    ) as run:
        result = crew.kickoff(inputs={"topic": topic})
        print(f"{topic}: {run.run_id}")

Next steps