
LLMs made it easy to generate answers. But the real opportunity lies in building systems that can act, not just respond.
That’s where agents come in.
Frameworks like CrewAI let you design teams of language models that take on real tasks. But in most setups today, agents only run when you tell them to. They’re passive. They wait.
If we want AI systems that feel more like software teammates — ones that respond to the world around them — we need to wire them into live data.
🧠 Why Agents Need Real-Time Data
All agent workflows:
- Rely on manual triggers
- Operate in batch
- Miss the moment something important happens
Now imagine an agent that:
- Receives incoming signals instantly
- Processes and classifies them on the fly
- Takes action — or escalates — without needing you to hit “run”
That’s what it looks like when agents are actually part of your system — not just a tool you query. This should be the default for any agentic application with real-time stakes.
🔌 Conduit: The Data Layer for Autonomous Agents
Conduit is a developer-focused data streaming & processing tool for routing data across your stack. It connects over 100 sources like HTTP, Kafka, S3, Postgres, Salesforce, and Zendesk, transforms the data with built-in processors (including LLMs), and pushes it to wherever your agents live.
Think of it as the glue between your real-world signals and your AI workflows.
With Conduit, agents can operate on:
- Live alerts
- User actions
- System logs
- Support tickets
- Anything else that happens in your stack
You don’t need to reinvent stream processing — just plug it in.
🤝 CrewAI + Conduit: A Simple, Powerful Stack
CrewAI lets you define a set of agents with clear roles and tasks. It's easy to use, flexible, and built for developers who want more than just a chatbot.
When you pair CrewAI with Conduit:
- Your agents get structured, enriched input the moment it matters
- You avoid polling, delays, and brittle pipelines
- You get closer to building systems that act with autonomy — not just automation
🛠 What We’ll Build: Real-Time Incident Response Agent
Incidents happen. But the standard response is still too manual:
- An alert fires
- An engineer sees it (maybe)
- Someone summarizes it
- Someone else posts a Slack update
- Hopefully, someone opens the right runbook
In this walkthrough, we’ll create a Real-Time Incident Commander:
-
Alerts come in through an HTTP endpoint
-
Conduit consumes alerts from Kafka and uses OpenAI to:
- Summarize the incident
- Classify urgency
-
Messages are streamed to Kafka
-
A CrewAI team kicks in:
TriageBot
classifies severityCommsBot
writes a Slack updateRunbookBot
suggests what to do next
-
Slack gets updated in real time
This is how you move from prompt-based AI to AI systems that are always on and always aware.
Let’s build it. 👇
🔧 System Architecture
🧰 Prerequisites
To follow along, you’ll need:
- Python 3.8+
- Docker + Docker Compose
- Kafka (via Bitnami image)
- OpenAI API Key
- Slack Bot Token and Channel ID
📁 Folder Structure
project-root/
├── docker-compose.yml
├── .env
├── pipeline.yaml
├── alert_api/
│ ├── app.py
│ └── Dockerfile
├── crewai_runner/
│ ├── agent_runner.py
│ └── Dockerfile
📄 .env
(Secrets & Config)
OPENAI_API_KEY=sk-...
SLACK_BOT_TOKEN=xoxb-...
SLACK_CHANNEL_ID=C01...
KAFKA_BOOTSTRAP_SERVER=kafka:9092
📦 docker-compose.yml
(KRaft Kafka + Conduit + Flask + Crew)
version: '3.8'
services:
kafka:
image: bitnami/kafka:latest
ports:
- "9092:9092"
environment:
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- ALLOW_PLAINTEXT_LISTENER=yes
conduit:
image: ghcr.io/conduitio/conduit:latest
ports:
- "8080:8080"
volumes:
- ./pipeline.yaml:/etc/conduit/pipeline.yaml
command: ["./conduit", "run", "/etc/conduit/pipeline.yaml"]
depends_on:
- kafka
env_file:
- .env
flask-alert-api:
build:
context: ./alert_api
ports:
- "5000:5000"
env_file:
- .env
depends_on:
- kafka
crewai-runner:
build:
context: ./crewai_runner
env_file:
- .env
depends_on:
- kafka
🧠 pipeline.yaml
(Conduit: Kafka → OpenAI → Kafka)
version: 2.2
pipelines:
- id: incident-pipeline
status: running
connectors:
- id: kafka-source
type: source
plugin: builtin:kafka
settings:
servers: ${KAFKA_BOOTSTRAP_SERVER}
topics: raw_alerts
- id: summarize
type: processor
plugin: openai.textgen
settings:
api_key: ${OPENAI_API_KEY}
developer_message: >
Summarize this alert and classify its urgency (low, medium, high).
Format as JSON: {"summary": "...", "urgency": "..."}
field: .Payload
- id: kafka-out
type: destination
plugin: builtin:kafka
settings:
servers: ${KAFKA_BOOTSTRAP_SERVER}
topics: enriched_alerts
🌐 alert_api/app.py
(Flask → Kafka)
from flask import Flask, request
from kafka import KafkaProducer
import os, json, logging
logging.basicConfig(level=logging.INFO)
producer = KafkaProducer(
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVER", "localhost:9092"),
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
app = Flask(__name__)
@app.route('/alert', methods=['POST'])
def alert():
try:
data = request.get_json()
logging.info(f"Received alert: {data}")
producer.send("raw_alerts", data)
return {"status": "ok"}, 200
except Exception as e:
logging.error(f"Kafka send failed: {e}")
return {"error": str(e)}, 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
🐳 alert_api/Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY app.py .
RUN pip install flask kafka-python
CMD ["python", "app.py"]
🤖 CrewAI Runner
import os, json, logging
from kafka import KafkaConsumer
from slack_sdk import WebClient
from crewai import Agent, Task, Crew
from slack_sdk.errors import SlackApiError
from dotenv import load_dotenv
load_dotenv()
logging.basicConfig(level=logging.INFO)
slack = WebClient(token=os.getenv("SLACK_BOT_TOKEN"))
channel = os.getenv("SLACK_CHANNEL_ID")
def triage_task(alert):
return f"🧠 Triage: {alert['summary']} — *{alert.get('urgency', 'medium')}* urgency."
def comms_task(alert):
return f"📢 Update: {alert['summary']}. Engineers are investigating."
def runbook_task(alert):
return f"🔧 Restart `{alert.get('instance', 'unknown')}` using [Runbook #42](https://runbooks.myorg.dev/42)."
triage_agent = Agent("TriageBot", goal="Classify severity", backstory="Knows alert patterns.")
comms_agent = Agent("CommsBot", goal="Write updates", backstory="Keeps teams informed.")
runbook_agent = Agent("RunbookBot", goal="Suggest fixes", backstory="Knows infra patterns.")
tasks = [
Task("Triage alert", agent=triage_agent),
Task("Write Slack update", agent=comms_agent),
Task("Suggest remediation", agent=runbook_agent),
]
crew = Crew(agents=[triage_agent, comms_agent, runbook_agent], tasks=tasks)
consumer = KafkaConsumer(
'enriched_alerts',
bootstrap_servers=os.getenv("KAFKA_BOOTSTRAP_SERVER", "localhost:9092"),
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for msg in consumer:
alert = msg.value
if not all(k in alert for k in ["summary", "urgency", "instance"]):
logging.warning("⚠️ Malformed alert: skipping")
continue
responses = crew.run(input_data=alert)
try:
slack.chat_postMessage(channel=channel, text="\n".join(responses))
logging.info("✅ Slack notification sent")
except SlackApiError as e:
logging.error(f"❌ Slack error: {e.response['error']}")
🤖 crewai_runner/Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY agent_runner.py .
RUN pip install kafka-python slack_sdk crewai openai python-dotenv
CMD ["python", "agent_runner.py"]
🧪 Start the Stack
docker-compose up --build
Then test it:
curl -X POST http://localhost:5000/alert \
-H "Content-Type: application/json" \
-d '{"alertname": "HighMemoryUsage", "instance": "api-1", "description": "Memory > 90%"}'
✅ Done.
You now have a working autonomous pipeline that:
- Accepts alerts via HTTP
- Streams data in real time using Kafka
- Enriches it with OpenAI
- Triggers AI agents
- Notifies Slack with no human in the loop
💥 Extend It
- Trigger PagerDuty or GitHub Actions based on agent output
- Route alerts by team, urgency, or region
- Store incident data in S3/Postgres for postmortems
- Add human-in-the-loop escalation if confidence is low
🧠 Takeaway
This isn’t just another monitoring pipeline. It’s the foundation of an autonomous incident response system powered by:
- Conduit: Real-time data movement and enrichment
- CrewAI: Agentic orchestration with role-based reasoning
- Kafka: Scalable, decoupled message routing
- Slack: Fast, familiar team communication
Ready to build autonomous systems that don't just think, but act? The future of incident response isn't about faster alerts — it's about intelligent systems that understand, communicate, and solve problems in real-time.
Your next-generation incident response system is just a few commands away. Start with this demo, then imagine what you could build when you combine:
- Streaming data pipelines with Conduit
- Autonomous agents with CrewAI
- Real-time communication with your team
Don't wait for the next incident. Build your autonomous response system today.