Skip to main content

Medical diagnosis workflow

From keywordsai-tracing/examples/medical_agent/Main.py. Runs a full agent workflow with workflow, task, tool, concurrent execution and file save.
from keywordsai_tracing.decorators import workflow, task, tool

@tool(name="Load Medical Report")
def load_medical_report():
    with open(MEDICAL_REPORT_PATH, "r") as file:
        medical_report = file.read()
    return medical_report

@task(name="Create Agents")
def create_agents(medical_report):
    return {
        "Cardiologist": Cardiologist(medical_report),
        "Psychologist": Psychologist(medical_report),
        "Pulmonologist": Pulmonologist(medical_report),
    }

@workflow(name="AI Medical Diagnosis Workflow")
def run_complete_workflow():
    medical_report = load_medical_report()
    agents = create_agents(medical_report)
    specialist_responses = run_specialist_agents(agents)
    final_diagnosis = run_multidisciplinary_analysis(specialist_responses)
    output_path = save_diagnosis(final_diagnosis)
    return output_path

Update span during OpenAI call

From examples/simple_span_updating_example.py. Uses get_client().update_current_span(...).
from keywordsai_tracing import KeywordsAITelemetry, get_client, workflow
from openai import OpenAI

@workflow(name="simple_span_updating_example")
def simple_span_updating_example(prompt: str = "Hello, world!"):
    client = get_client()
    response = OpenAI().chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    client.update_current_span(keywordsai_params={"customer_identifier": "updated_customer_id"})
    return response.choices[0].message

Client API: span updates, events, exceptions

From examples/span_updating_example.py. Demonstrates updates, events, error recording.
from keywordsai_tracing import get_client, workflow, task
from opentelemetry.trace import StatusCode

@workflow(name="data_processing_workflow")
def data_processing_workflow(data):
    client = get_client()
    client.update_current_span(
        keywordsai_params={
            "trace_group_identifier": "data-processing-group",
            "metadata": {"data_size": len(str(data))}
        },
        attributes={"custom.processing_stage": "validation"}
    )
    client.add_event("validation_started", {"input_length": len(str(data))})
    return validate_data(data)

@task(name="validate_data")
def validate_data(data):
    client = get_client()
    try:
        if not data or len(str(data)) < 3:
            raise ValueError("Data is too short for processing")
        client.update_current_span(status=StatusCode.OK, attributes={"validation.result": "success"})
        return f"validated_{data}"
    except Exception as e:
        client.record_exception(e)
        client.update_current_span(attributes={"validation.result": "failed"})
        raise

Custom exporters

From examples/custom_exporter_example.py. File and console exporters.
from keywordsai_tracing import KeywordsAITelemetry
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

class CustomFileExporter(SpanExporter):
    def __init__(self, filename: str = "my_traces.jsonl"):
        self.filename = filename
    def export(self, spans):
        with open(self.filename, 'a') as f:
            for span in spans:
                f.write("...json line...\n")
        return SpanExportResult.SUCCESS

telemetry = KeywordsAITelemetry(custom_exporter=CustomFileExporter())
tracer = telemetry.tracer.get_tracer()
with tracer.start_as_current_span("parent_operation"):
    pass
telemetry.flush()

Backend export patterns

From examples/backend_trace_collection_example.py. Direct logging and safe trace collection.
from keywordsai_tracing import KeywordsAITelemetry, workflow, task
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult

class DirectLoggingExporter(SpanExporter):
    def export(self, spans):
        for span in spans:
            print(span.name)
        return SpanExportResult.SUCCESS

telemetry = KeywordsAITelemetry(custom_exporter=DirectLoggingExporter(), is_batching_enabled=False)

@workflow(name="example_workflow")
def example_workflow():
    @task(name="task_1")
    def task_1():
        return "result"
    return task_1()

Threading and context experiments

From examples/threading_context_experiment.py and run_threading_experiment.py. Shows OpenTelemetry context propagation, manual attach/detach, and ThreadPoolExecutor behavior.
python examples/run_threading_experiment.py

Traces API example

From examples/traces_API_examples/list_traces.py. Fetch traces via API.
import requests, os
from dotenv import load_dotenv
load_dotenv()
url = f"{os.getenv('KEYWORDSAI_BASE_URL')}/v1/traces"
print(requests.post(url).json())

Logging hierarchy

From examples/logging_hierarchy_example.py. Demonstrates logger inheritance and levels.
import logging
from keywordsai_tracing.constants import LOGGER_NAME
parent_logger = logging.getLogger(LOGGER_NAME)
child1_logger = logging.getLogger(f"{LOGGER_NAME}.core.exporter")
child2_logger = logging.getLogger(f"{LOGGER_NAME}.core.client")
parent_logger.setLevel(logging.DEBUG)

Debug logging

From examples/debug_logging_example.py. Use log_level to control verbosity.
from keywordsai_tracing import KeywordsAITelemetry
telemetry_debug = KeywordsAITelemetry(log_level="DEBUG", app_name="debug_example")