Push live updates to clients instantly. Build reactive UIs like Uber's live tracking, Airbnb's instant booking confirmations, and real-time dashboards.
Booking confirmations, payment status, task progress, agent activity
New messages, approval requests, system alerts, reminders
Presence indicators, typing status, live edits, shared cursors
Price changes, availability updates, analytics dashboards, metrics
| Feature | Server-Sent Events (SSE) | WebSocket |
|---|---|---|
| Direction | Server → Client only | Bidirectional |
| Protocol | HTTP (simpler) | WS protocol (more complex) |
| Reconnection | Automatic | Manual |
| Best For | Notifications, status updates | Chat, gaming, collaboration |
| Recommendation | Use for most cases | Use when bidirectional needed |
// app/api/events/stream/route.ts
import { NextRequest } from "next/server"
import { Redis } from "@upstash/redis" // Assuming Redis is used for pub/sub
export async function GET(request: NextRequest) {
const searchParams = request.nextUrl.searchParams
const userId = searchParams.get("userId")
const channels = searchParams.get("channels")?.split(",") || []
// Create a readable stream
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
// Send initial connection event
controller.enqueue(
encoder.encode(`event: connected\ndata: {"status": "connected"}\n\n`)
)
// Subscribe to events
const subscription = await subscribeToEvents(userId, channels, (event) => {
const data = JSON.stringify(event)
controller.enqueue(
encoder.encode(`event: ${event.type}\ndata: ${data}\n\n`)
)
})
// Send heartbeat every 30 seconds
const heartbeat = setInterval(() => {
controller.enqueue(
encoder.encode(`: heartbeat\n\n`)
)
}, 30000)
// Cleanup on close
request.signal.addEventListener("abort", () => {
clearInterval(heartbeat)
subscription.unsubscribe()
controller.close()
})
}
})
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" // Disable nginx buffering
}
})
}
// Event subscription using Redis pub/sub
async function subscribeToEvents(
userId: string,
channels: string[],
callback: (event: any) => void
) {
// Ensure Redis client is initialized and configured correctly
const redis = new Redis({
url: process.env.KV_REST_API_URL!,
token: process.env.KV_REST_API_TOKEN!
})
// Subscribe to user-specific and general channels
const subscriptionChannels = [
`user:${userId}`,
...channels.map(c => `channel:${c}`)
]
await redis.subscribe(...subscriptionChannels, (message, channel) => {
callback({
type: channel.split(":")[1],
...JSON.parse(message)
})
})
return {
unsubscribe: () => redis.unsubscribe(...subscriptionChannels)
}
}// hooks/use-event-stream.ts
"use client"
import { useEffect, useState, useCallback } from "react"
interface EventStreamOptions {
userId: string
channels?: string[]
onEvent?: (event: any) => void
}
export function useEventStream({ userId, channels = [], onEvent }: EventStreamOptions) {
const [isConnected, setIsConnected] = useState(false)
const [lastEvent, setLastEvent] = useState<any>(null)
useEffect(() => {
const params = new URLSearchParams({
userId,
channels: channels.join(",")
})
const eventSource = new EventSource(`/api/events/stream?${params}`)
eventSource.addEventListener("connected", () => {
setIsConnected(true)
})
eventSource.addEventListener("booking_update", (e) => {
const data = JSON.parse(e.data)
setLastEvent(data)
onEvent?.(data)
})
eventSource.addEventListener("notification", (e) => {
const data = JSON.parse(e.data)
setLastEvent(data)
onEvent?.(data)
})
eventSource.addEventListener("agent_status", (e) => {
const data = JSON.parse(e.data)
setLastEvent(data)
onEvent?.(data)
})
eventSource.onerror = () => {
setIsConnected(false)
// EventSource automatically reconnects
}
return () => {
eventSource.close()
}
}, [userId, channels.join(","), onEvent])
return { isConnected, lastEvent }
}
// Usage in component
export function BookingStatus({ bookingId }: { bookingId: string }) {
const [status, setStatus] = useState("pending")
const { isConnected } = useEventStream({
userId: "user_123",
channels: [`booking:${bookingId}`],
onEvent: (event) => {
if (event.bookingId === bookingId) {
setStatus(event.status)
}
}
})
return (
<div>
<div className="flex items-center gap-2">
<span className={`h-2 w-2 rounded-full ${isConnected ? "bg-green-500" : "bg-red-500"}`} />
<span>{isConnected ? "Live" : "Connecting..."}</span>
</div>
<p>Status: {status}</p>
</div>
)
}// lib/realtime/publish.ts
import { Redis } from "@upstash/redis"
// Assuming inngest is imported and configured elsewhere
// import { inngest } from "@/lib/inngest"
const redis = new Redis({
url: process.env.KV_REST_API_URL!,
token: process.env.KV_REST_API_TOKEN!
})
interface RealtimeEvent {
type: string
[key: string]: any
}
// Publish to a specific user
export async function publishToUser(userId: string, event: RealtimeEvent) {
await redis.publish(`user:${userId}`, JSON.stringify(event))
}
// Publish to a channel (multiple users)
export async function publishToChannel(channel: string, event: RealtimeEvent) {
await redis.publish(`channel:${channel}`, JSON.stringify(event))
}
// Broadcast to all connected clients
export async function broadcast(event: RealtimeEvent) {
await redis.publish("channel:global", JSON.stringify(event))
}
// Example: Publish booking update
export async function notifyBookingUpdate(
bookingId: string,
guestId: string,
hostId: string,
status: string
) {
const event = {
type: "booking_update",
bookingId,
status,
timestamp: new Date().toISOString()
}
// Notify both guest and host
await Promise.all([
publishToUser(guestId, event),
publishToUser(hostId, event),
publishToChannel(`booking:${bookingId}`, event)
])
}
// Integration with Inngest (example, assuming inngest is configured)
// export const notifyOnBookingConfirm = inngest.createFunction(
// { id: "notify-booking-confirm" },
// { event: "booking.confirmed" },
// async ({ event, step }) => {
// await step.run("publish-realtime", async () => {
// await notifyBookingUpdate(
// event.data.bookingId,
// event.data.guestId,
// event.data.hostId,
// "confirmed"
// )
// })
// }
// )
// components/realtime-provider.tsx
"use client"
import { createContext, useContext, useEffect, useState, useRef } from "react"
interface RealtimeContextType {
isConnected: boolean
subscribe: (channel: string, callback: (event: any) => void) => () => void
}
const RealtimeContext = createContext<RealtimeContextType | null>(null)
export function RealtimeProvider({
children,
userId
}: {
children: React.ReactNode
userId: string
}) {
const [isConnected, setIsConnected] = useState(false)
const eventSourceRef = useRef<EventSource | null>(null)
const subscribersRef = useRef<Map<string, Set<(e: any) => void>>>(new Map())
useEffect(() => {
const es = new EventSource(`/api/events/stream?userId=${userId}`)
eventSourceRef.current = es
es.onopen = () => setIsConnected(true)
es.onerror = () => {
setIsConnected(false)
es.close() // Close on error to allow re-initialization if needed
}
es.onmessage = (e) => {
try {
const eventData = JSON.parse(e.data)
const channel = eventData.channel // Assuming your events have a 'channel' property
const channelSubscribers = subscribersRef.current.get(channel)
channelSubscribers?.forEach(cb => cb(eventData))
} catch (error) {
console.error("Error parsing event data:", error)
}
}
return () => {
es.close()
eventSourceRef.current = null
}
}, [userId])
const subscribe = (channel: string, callback: (event: any) => void) => {
if (!subscribersRef.current.has(channel)) {
subscribersRef.current.set(channel, new Set())
}
subscribersRef.current.get(channel)!.add(callback)
// Return an unsubscribe function
return () => {
subscribersRef.current.get(channel)?.delete(callback)
if (subscribersRef.current.get(channel)?.size === 0) {
subscribersRef.current.delete(channel)
}
}
}
return (
<RealtimeContext.Provider value={{ isConnected, subscribe }}>
{children}
</RealtimeContext.Provider>
)
}
// Custom hook for components
export function useRealtimeChannel<T>(
channel: string,
callback: (event: T) => void
) {
const context = useContext(RealtimeContext)
useEffect(() => {
if (!context) return
return context.subscribe(channel, callback)
}, [channel, callback, context])
return { isConnected: context?.isConnected ?? false }
}
// Usage
function AgentStatusIndicator({ agentId }: { agentId: string }) {
const [status, setStatus] = useState<"idle" | "running" | "error">("idle")
const { isConnected } = useRealtimeChannel<{ status: string }>(
`agent:${agentId}`, // Assuming the event has a channel like 'agent:agentId'
(event) => setStatus(event.status as any)
)
return (
<div className="flex items-center gap-2">
<div className={`h-3 w-3 rounded-full ${
status === "running" ? "bg-green-500 animate-pulse" :
status === "error" ? "bg-red-500" : "bg-gray-400"
}`} />
<span>{status}</span>
</div>
)
}