Stream, Think, Act: Building Autonomous Incident Response Agents with Kafka, Conduit, and CrewAI

By  DeVaris Brown

 28 May 2025

Crew and Conduit for real time agents

LLMs made it easy to generate answers. But the real opportunity lies in building systems that can act, not just respond.

That’s where agents come in.

Frameworks like CrewAI let you design teams of language models that take on real tasks. But in most setups today, agents only run when you tell them to. They’re passive. They wait.

If we want AI systems that feel more like software teammates — ones that respond to the world around them — we need to wire them into live data.


🧠 Why Agents Need Real-Time Data

All agent workflows:

  • Rely on manual triggers
  • Operate in batch
  • Miss the moment something important happens

Now imagine an agent that:

  • Receives incoming signals instantly
  • Processes and classifies them on the fly
  • Takes action — or escalates — without needing you to hit “run”

That’s what it looks like when agents are actually part of your system — not just a tool you query. This should be the default for any agentic application with real-time stakes.


🔌 Conduit: The Data Layer for Autonomous Agents

Conduit is a developer-focused data streaming & processing tool for routing data across your stack. It connects over 100 sources like HTTP, Kafka, S3, Postgres, Salesforce, and Zendesk, transforms the data with built-in processors (including LLMs), and pushes it to wherever your agents live.

Think of it as the glue between your real-world signals and your AI workflows.

With Conduit, agents can operate on:

  • Live alerts
  • User actions
  • System logs
  • Support tickets
  • Anything else that happens in your stack

You don’t need to reinvent stream processing — just plug it in.


🤝 CrewAI + Conduit: A Simple, Powerful Stack

CrewAI lets you define a set of agents with clear roles and tasks. It's easy to use, flexible, and built for developers who want more than just a chatbot.

When you pair CrewAI with Conduit:

  • Your agents get structured, enriched input the moment it matters
  • You avoid polling, delays, and brittle pipelines
  • You get closer to building systems that act with autonomy — not just automation

🛠 What We’ll Build: Real-Time Incident Response Agent

Incidents happen. But the standard response is still too manual:

  • An alert fires
  • An engineer sees it (maybe)
  • Someone summarizes it
  • Someone else posts a Slack update
  • Hopefully, someone opens the right runbook

In this walkthrough, we’ll create a Real-Time Incident Commander:

  • Alerts come in through an HTTP endpoint

  • Conduit consumes alerts from Kafka and uses OpenAI to:

    • Summarize the incident
    • Classify urgency
  • Messages are streamed to Kafka

  • A CrewAI team kicks in:

    • TriageBot classifies severity
    • CommsBot writes a Slack update
    • RunbookBot suggests what to do next
  • Slack gets updated in real time

This is how you move from prompt-based AI to AI systems that are always on and always aware.

Let’s build it. 👇


🔧 System Architecture

System Architecture for Crew + Conduit


🧰 Prerequisites

To follow along, you’ll need:

  • Python 3.8+
  • Docker + Docker Compose
  • Kafka (via Bitnami image)
  • OpenAI API Key
  • Slack Bot Token and Channel ID

📁 Folder Structure

project-root/
├── docker-compose.yml
├── .env
├── pipeline.yaml
├── alert_api/
│   ├── app.py
│   └── Dockerfile
├── crewai_runner/
│   ├── agent_runner.py
│   └── Dockerfile

📄 .env (Secrets & Config)

OPENAI_API_KEY=sk-...
SLACK_BOT_TOKEN=xoxb-...
SLACK_CHANNEL_ID=C01...
KAFKA_BOOTSTRAP_SERVER=kafka:9092

📦 docker-compose.yml (KRaft Kafka + Conduit + Flask + Crew)

version: '3.8'

services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - ALLOW_PLAINTEXT_LISTENER=yes

  conduit:
    image: ghcr.io/conduitio/conduit:latest
    ports:
      - "8080:8080"
    volumes:
      - ./pipeline.yaml:/etc/conduit/pipeline.yaml
    command: ["./conduit", "run", "/etc/conduit/pipeline.yaml"]
    depends_on:
      - kafka
    env_file:
      - .env

  flask-alert-api:
    build:
      context: ./alert_api
    ports:
      - "5000:5000"
    env_file:
      - .env
    depends_on:
      - kafka

  crewai-runner:
    build:
      context: ./crewai_runner
    env_file:
      - .env
    depends_on:
      - kafka

🧠 pipeline.yaml (Conduit: Kafka → OpenAI → Kafka)

version: 2.2

pipelines:
  - id: incident-pipeline
    status: running
    connectors:
      - id: kafka-source
        type: source
        plugin: builtin:kafka
        settings:
          servers: ${KAFKA_BOOTSTRAP_SERVER}
          topics: raw_alerts

      - id: summarize
        type: processor
        plugin: openai.textgen
        settings:
          api_key: ${OPENAI_API_KEY}
          developer_message: >
            Summarize this alert and classify its urgency (low, medium, high).
            Format as JSON: {"summary": "...", "urgency": "..."}
          field: .Payload

      - id: kafka-out
        type: destination
        plugin: builtin:kafka
        settings:
          servers: ${KAFKA_BOOTSTRAP_SERVER}
          topics: enriched_alerts

🌐 alert_api/app.py (Flask → Kafka)

from flask import Flask, request
from kafka import KafkaProducer
import os, json, logging

logging.basicConfig(level=logging.INFO)

producer = KafkaProducer(
    bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVER", "localhost:9092"),
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

app = Flask(__name__)

@app.route('/alert', methods=['POST'])
def alert():
    try:
        data = request.get_json()
        logging.info(f"Received alert: {data}")
        producer.send("raw_alerts", data)
        return {"status": "ok"}, 200
    except Exception as e:
        logging.error(f"Kafka send failed: {e}")
        return {"error": str(e)}, 500

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

🐳 alert_api/Dockerfile

FROM python:3.10-slim

WORKDIR /app
COPY app.py .

RUN pip install flask kafka-python

CMD ["python", "app.py"]

🤖 CrewAI Runner

import os, json, logging
from kafka import KafkaConsumer
from slack_sdk import WebClient
from crewai import Agent, Task, Crew
from slack_sdk.errors import SlackApiError
from dotenv import load_dotenv

load_dotenv()
logging.basicConfig(level=logging.INFO)

slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
channel = os.getenv("SLACK_CHANNEL_ID")

def triage_task(alert):
    return f"🧠 Triage: {alert['summary']} — *{alert.get('urgency', 'medium')}* urgency."

def comms_task(alert):
    return f"📢 Update: {alert['summary']}. Engineers are investigating."

def runbook_task(alert):
    return f"🔧 Restart `{alert.get('instance', 'unknown')}` using [Runbook #42](https://runbooks.myorg.dev/42)."

triage_agent = Agent("TriageBot", goal="Classify severity", backstory="Knows alert patterns.")
comms_agent = Agent("CommsBot", goal="Write updates", backstory="Keeps teams informed.")
runbook_agent = Agent("RunbookBot", goal="Suggest fixes", backstory="Knows infra patterns.")

tasks = [
    Task("Triage alert", agent=triage_agent),
    Task("Write Slack update", agent=comms_agent),
    Task("Suggest remediation", agent=runbook_agent),
]

crew = Crew(agents=[triage_agent, comms_agent, runbook_agent], tasks=tasks)

consumer = KafkaConsumer(
    'enriched_alerts',
    bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVER", "localhost:9092"),
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for msg in consumer:
    alert = msg.value
    if not all(k in alert for k in ["summary", "urgency", "instance"]):
        logging.warning("⚠️ Malformed alert: skipping")
        continue
    responses = crew.run(input_data=alert)
    try:
        slack.chat_postMessage(channel=channel, text="\n".join(responses))
        logging.info("✅ Slack notification sent")
    except SlackApiError as e:
        logging.error(f"❌ Slack error: {e.response['error']}")

🤖 crewai_runner/Dockerfile

FROM python:3.10-slim

WORKDIR /app

COPY agent_runner.py .

RUN pip install kafka-python slack_sdk crewai openai python-dotenv

CMD ["python", "agent_runner.py"]

🧪 Start the Stack

docker-compose up --build

Then test it:

curl -X POST http://localhost:5000/alert \
  -H "Content-Type: application/json" \
  -d '{"alertname": "HighMemoryUsage", "instance": "api-1", "description": "Memory > 90%"}'

✅ Done.

You now have a working autonomous pipeline that:

  • Accepts alerts via HTTP
  • Streams data in real time using Kafka
  • Enriches it with OpenAI
  • Triggers AI agents
  • Notifies Slack with no human in the loop

💥 Extend It

  • Trigger PagerDuty or GitHub Actions based on agent output
  • Route alerts by team, urgency, or region
  • Store incident data in S3/Postgres for postmortems
  • Add human-in-the-loop escalation if confidence is low

🧠 Takeaway

This isn’t just another monitoring pipeline. It’s the foundation of an autonomous incident response system powered by:

  • Conduit: Real-time data movement and enrichment
  • CrewAI: Agentic orchestration with role-based reasoning
  • Kafka: Scalable, decoupled message routing
  • Slack: Fast, familiar team communication

Ready to build autonomous systems that don't just think, but act? The future of incident response isn't about faster alerts — it's about intelligent systems that understand, communicate, and solve problems in real-time.

Your next-generation incident response system is just a few commands away. Start with this demo, then imagine what you could build when you combine:

  • Streaming data pipelines with Conduit
  • Autonomous agents with CrewAI
  • Real-time communication with your team

Don't wait for the next incident. Build your autonomous response system today.

     crewAI, Conduit, Ai

DeVaris Brown

DeVaris Brown

CEO and Co-Founder @ Meroxa.