Skip to main content
Sinks determine the destination for your trace events. The SDK includes several built-in sinks for different use cases.

Available sinks

SinkUse case
HttpBatchSinkSend to Arzule cloud (production)
JsonlFileSinkWrite to local file (development)
CompositeSinkSend to multiple destinations

HttpBatchSink

Sends events to the Arzule ingest API in batches.
from arzule_ingest.sinks import HttpBatchSink

sink = HttpBatchSink(
    endpoint_url="https://ingest.arzule.com",
    api_key="your-api-key",
    batch_size=100,       # Events per batch (default: 100)
    flush_interval=5.0,   # Seconds between flushes (default: 5)
    timeout=30.0,         # Request timeout in seconds (default: 30)
    max_retries=3         # Retry failed requests (default: 3)
)

Parameters

ParameterTypeDefaultDescription
endpoint_urlstrRequiredIngest API URL
api_keystrRequiredAPI key for authentication
batch_sizeint100Maximum events per batch
flush_intervalfloat5.0Seconds between automatic flushes
timeoutfloat30.0HTTP request timeout
max_retriesint3Retry attempts for failed requests

Automatic batching

Events are buffered and sent in batches for efficiency. A batch is sent when:
  • batch_size events have accumulated
  • flush_interval seconds have passed
  • The run ends (explicit flush)

JsonlFileSink

Writes events to a local JSONL (JSON Lines) file. Each line is a complete JSON object.
from arzule_ingest.sinks import JsonlFileSink

sink = JsonlFileSink(
    path="traces/output.jsonl",
    append=True  # Append to existing file (default: True)
)

Parameters

ParameterTypeDefaultDescription
pathstrRequiredFile path to write to
appendboolTrueAppend to existing file or overwrite

Output format

{"schema_version":"trace_event.v0_1","run_id":"...","event_type":"run.start",...}
{"schema_version":"trace_event.v0_1","run_id":"...","event_type":"agent.execution.start",...}
{"schema_version":"trace_event.v0_1","run_id":"...","event_type":"tool.call.start",...}

Development workflow

Use JsonlFileSink during development, then view traces with the CLI:
# Timeline view
arzule view traces/output.jsonl

# Statistics
arzule stats traces/output.jsonl

CompositeSink

Sends events to multiple sinks simultaneously. Useful for:
  • Local debugging while sending to production
  • Redundant storage
  • Splitting traces between systems
from arzule_ingest.sinks import CompositeSink, HttpBatchSink, JsonlFileSink

# Send to both Arzule cloud and a local file
sink = CompositeSink([
    HttpBatchSink(
        endpoint_url="https://ingest.arzule.com",
        api_key="..."
    ),
    JsonlFileSink("traces/backup.jsonl")
])

Error handling

If one sink fails, others continue to receive events. Errors are logged but don’t stop the run.

Custom sinks

Implement the BaseSink interface for custom destinations:
from arzule_ingest.sinks import BaseSink
from arzule_ingest import TraceEvent

class MyCustomSink(BaseSink):
    def __init__(self, config):
        self.config = config
    
    def emit(self, event: TraceEvent) -> None:
        # Send event to your destination
        print(f"Event: {event.event_type}")
    
    def flush(self) -> None:
        # Flush any buffered events
        pass
    
    def close(self) -> None:
        # Clean up resources
        pass
Use your custom sink:
from arzule_ingest import ArzuleRun

sink = MyCustomSink(config={...})

with ArzuleRun(
    tenant_id="...",
    project_id="...",
    sink=sink
) as run:
    pass

Sink selection by environment

A common pattern is selecting sinks based on environment:
import os
from arzule_ingest.sinks import HttpBatchSink, JsonlFileSink

def get_sink():
    env = os.getenv("ENVIRONMENT", "development")
    
    if env == "production":
        return HttpBatchSink(
            endpoint_url="https://ingest.arzule.com",
            api_key=os.getenv("ARZULE_API_KEY")
        )
    else:
        return JsonlFileSink("traces/dev.jsonl")

sink = get_sink()

Next steps