HTTP vs WebSocket โ
The Core Difference
Know when to use WebSockets vs regular HTTP calls. Choose the wrong one and you'll either waste resources or miss real-time capabilities.
Client always initiates. Server can only respond. Each request opens a new connection (or reuses one briefly). Works great for: fetching data on demand, form submissions, CRUD APIs. Angular: HttpClient.get(), post().
One connection, both sides can send at any time. Server can push to client WITHOUT the client asking. Works great for: chat, live scores, stock prices, multiplayer games, ML model streaming results, dashboards.
- Server needs to PUSH data to client (not just respond)
- Very frequent updates (more than once per 2 seconds)
- Real-time collaboration (multiple users affecting same data)
- Low latency matters (games, trading, auctions)
- Long-running operations with progress (ML training, file uploads)
// HTTP Client: GET /users โ new TCP connection (or reuse) Server: โ sends response -- connection may close -- Client: GET /users/1 โ new request (could be same connection) Server: โ sends response // WebSocket Client: GET /ws (Upgrade: websocket) โ HTTP handshake Server: 101 Switching Protocols โ upgrade confirmed -- SINGLE CONNECTION STAYS OPEN -- Client: {"type": "join", "room": "chat"} โ send anytime Server: {"type": "message", "text": "hi"} โ push anytime Server: {"type": "user_joined"} โ push anytime Client: {"type": "message", "text": "yo"} โ send anytime -- connection closes when either side disconnects --
FastAPI WebSocket
Server
FastAPI has native async WebSocket support. No libraries needed beyond FastAPI itself.
from fastapi import APIRouter, WebSocket, WebSocketDisconnect import json, logging router = APIRouter() # Simple echo server โ reflects back what client sends @router.websocket("/ws/echo") async def ws_echo(websocket: WebSocket): await websocket.accept() # accept the connection try: while True: # Receive โ text, bytes, or JSON data = await websocket.receive_text() # raw string # data = await websocket.receive_json() # parse JSON # data = await websocket.receive_bytes() # binary # Send โ text, bytes, or JSON await websocket.send_text(f"Echo: {data}") # await websocket.send_json({"echo": data}) except WebSocketDisconnect: # Client disconnected โ normal, handle gracefully logging.info("Client disconnected") # Path parameters work with WebSockets too! @router.websocket("/ws/{client_id}") async def ws_with_id(websocket: WebSocket, client_id: str): await websocket.accept() try: while True: msg = await websocket.receive_json() await websocket.send_json({ "from": client_id, "data": msg, "timestamp": datetime.utcnow().isoformat() }) except WebSocketDisconnect: logging.info(f"Client {client_id} disconnected")
from fastapi import WebSocket from typing import Dict import asyncio class ConnectionManager: """Manages all active WebSocket connections""" def __init__(self): self.active: Dict[str, WebSocket] = {} # client_id โ socket async def connect(self, ws: WebSocket, client_id: str): await ws.accept() self.active[client_id] = ws await self.broadcast({ "type": "user_joined", "client_id": client_id, "online_count": len(self.active) }, exclude=client_id) def disconnect(self, client_id: str): self.active.pop(client_id, None) async def send_to(self, client_id: str, message: dict): """Send to specific client""" ws = self.active.get(client_id) if ws: await ws.send_json(message) async def broadcast(self, message: dict, exclude: str = None): """Send to ALL connected clients""" dead = [] for cid, ws in self.active.items(): if cid == exclude: continue try: await ws.send_json(message) except Exception: dead.append(cid) # connection died silently for cid in dead: self.disconnect(cid) # Singleton instance manager = ConnectionManager() @router.websocket("/ws/chat/{client_id}") async def chat_ws(websocket: WebSocket, client_id: str): await manager.connect(websocket, client_id) try: while True: data = await websocket.receive_json() await manager.broadcast({ "type": "message", "from": client_id, "text": data.get("text"), "timestamp": datetime.utcnow().isoformat() }) except WebSocketDisconnect: manager.disconnect(client_id) await manager.broadcast({ "type": "user_left", "client_id": client_id })
from collections import defaultdict class RoomManager: def __init__(self): # room_id โ {client_id: websocket} self.rooms: Dict[str, Dict[str, WebSocket]] = defaultdict(dict) async def join(self, ws: WebSocket, room_id: str, client_id: str): await ws.accept() self.rooms[room_id][client_id] = ws await self.broadcast_to_room(room_id, { "type": "joined", "user": client_id, "members": list(self.rooms[room_id].keys()) }) async def leave(self, room_id: str, client_id: str): self.rooms[room_id].pop(client_id, None) if not self.rooms[room_id]: del self.rooms[room_id] # empty room cleanup else: await self.broadcast_to_room(room_id, { "type": "left", "user": client_id }) async def broadcast_to_room(self, room_id: str, message: dict): for ws in self.rooms.get(room_id, {}).values(): try: await ws.send_json(message) except: pass room_manager = RoomManager() @router.websocket("/ws/room/{room_id}/{client_id}") async def room_ws(ws: WebSocket, room_id: str, client_id: str): await room_manager.join(ws, room_id, client_id) try: while True: data = await ws.receive_json() await room_manager.broadcast_to_room(room_id, { "type": "message", "from": client_id, "text": data.get("text") }) except WebSocketDisconnect: await room_manager.leave(room_id, client_id)
Angular WebSocket
Service
The right way to use WebSockets in Angular โ as an injectable service with RxJS Observables, so any component can subscribe to messages.
import { Injectable, OnDestroy } from '@angular/core'; import { Subject, Observable, timer, EMPTY } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { switchMap, retryWhen, delayWhen, tap, catchError } from 'rxjs/operators'; export interface WsMessage { type: string; [key: string]: any; } @Injectable({ providedIn: 'root' }) export class WebSocketService implements OnDestroy { private socket$!: WebSocketSubject<WsMessage>; private messagesSubject$ = new Subject<WsMessage>(); // Observable that components subscribe to messages$ = this.messagesSubject$.asObservable(); connect(url: string): void { if (this.socket$ && !this.socket$.closed) return; this.socket$ = webSocket({ url, openObserver: { next: () => console.log('๐ WebSocket connected') }, closeObserver: { next: () => console.log('๐ WebSocket disconnected') } }); this.socket$.pipe( // Auto-reconnect on error (retryWhen deprecated โ use retry in RxJS 7+) retryWhen(errors => errors.pipe( tap(() => console.log('โ ๏ธ WS error, reconnecting in 2s...')), delayWhen(() => timer(2000)) ) ) ).subscribe({ next: (msg) => this.messagesSubject$.next(msg), error: (err) => console.error('WS error:', err), }); } send(message: WsMessage): void { if (this.socket$ && !this.socket$.closed) { this.socket$.next(message); } else { console.warn('WebSocket not connected'); } } disconnect(): void { this.socket$?.complete(); } // Filter messages by type โ returns Observable for that type only on<T>(type: string): Observable<T> { return this.messages$.pipe( filter(msg => msg.type === type) ) as Observable<T>; } ngOnDestroy(): void { this.disconnect(); } }
import { Component, OnInit, OnDestroy, inject } from '@angular/core'; import { Subject } from 'rxjs'; import { takeUntil } from 'rxjs/operators'; import { WebSocketService } from '../services/websocket.service'; @Component({ selector: 'app-chat', template: ` <div *ngFor="let msg of messages"> <strong>{{ msg.from }}:</strong> {{ msg.text }} </div> <input [(ngModel)]="newMessage" (keyup.enter)="send()" /> <button (click)="send()">Send</button> <span>{{ onlineCount }} online</span> ` }) export class ChatComponent implements OnInit, OnDestroy { private ws = inject(WebSocketService); private destroy$ = new Subject<void>(); messages: any[] = []; newMessage = ''; onlineCount = 0; clientId = crypto.randomUUID().slice(0, 8); ngOnInit(): void { // Connect to WebSocket this.ws.connect(`ws://localhost:8000/ws/chat/${this.clientId}`); // Listen for chat messages this.ws.on('message').pipe( takeUntil(this.destroy$) ).subscribe(msg => { this.messages.push(msg); }); // Listen for user joins this.ws.on('user_joined').pipe( takeUntil(this.destroy$) ).subscribe(msg => { this.onlineCount = msg.online_count; }); // Or subscribe to ALL messages this.ws.messages$.pipe( takeUntil(this.destroy$) ).subscribe(msg => { switch (msg.type) { case 'message': this.messages.push(msg); break; case 'user_joined': this.onlineCount = msg.online_count; break; case 'user_left': this.onlineCount--; break; } }); } send(): void { if (!this.newMessage.trim()) return; this.ws.send({ type: 'message', text: this.newMessage }); this.newMessage = ''; } ngOnDestroy(): void { this.destroy$.next(); // unsubscribe from all observables this.destroy$.complete(); this.ws.disconnect(); } }
import { webSocket } from 'rxjs/webSocket'; import { retry, delay, share, filter, map } from 'rxjs/operators'; // Robust production-grade WS service with RxJS @Injectable({ providedIn: 'root' }) export class WsService { private readonly WS_URL = environment.wsUrl; // from environment.ts private socket$ = webSocket({ url: this.WS_URL, serializer: (msg) => JSON.stringify(msg), deserializer: ({data}) => JSON.parse(data), }).pipe( retry({ count: 5, delay: 2000 }), // retry 5 times, 2s apart share() // ONE shared connection, even if many components subscribe ); // Type-safe message filtering chatMessages$ = this.socket$.pipe( filter((m: any) => m.type === 'message'), map((m: any) => m as ChatMessage) ); notifications$ = this.socket$.pipe( filter((m: any) => m.type === 'notification') ); presence$ = this.socket$.pipe( filter((m: any) => ['user_joined', 'user_left'].includes(m.type)) ); send(msg: object): void { (this.socket$ as any).next(msg); } } // environment.ts export const environment = { production: false, apiUrl: 'http://localhost:8000', wsUrl: 'ws://localhost:8000/ws' // wss:// for production };
Live Chat App โ
End to End
Complete working chat: Angular frontend + FastAPI backend. Every file you need.
from fastapi import APIRouter, WebSocket, WebSocketDisconnect from datetime import datetime import json router = APIRouter() # โโ Connection Manager โโโโโโโโโโโโโโโโโโโโโโโโโโโโ class ChatManager: def __init__(self): self.connections: dict[str, WebSocket] = {} self.message_history: list = [] # last 50 messages async def connect(self, ws: WebSocket, username: str): await ws.accept() self.connections[username] = ws # Send history to new joiner await ws.send_json({ "type": "history", "messages": self.message_history[-50:] }) # Announce to everyone await self.broadcast({ "type": "system", "text": f"{username} joined", "online": list(self.connections.keys()) }) def disconnect(self, username: str): self.connections.pop(username, None) async def broadcast(self, msg: dict): msg["timestamp"] = datetime.utcnow().isoformat() if msg.get("type") == "message": self.message_history.append(msg) stale = [] for uname, ws in self.connections.items(): try: await ws.send_json(msg) except: stale.append(uname) for u in stale: self.disconnect(u) chat = ChatManager() @router.websocket("/ws/chat") async def chat_endpoint(ws: WebSocket, username: str): await chat.connect(ws, username) try: while True: data = await ws.receive_json() action = data.get("type") if action == "message": await chat.broadcast({ "type": "message", "from": username, "text": data.get("text", "")[:500] # limit length }) elif action == "typing": await chat.broadcast({ "type": "typing", "user": username }) except WebSocketDisconnect: chat.disconnect(username) await chat.broadcast({ "type": "system", "text": f"{username} left", "online": list(chat.connections.keys()) })
import { Injectable, OnDestroy } from '@angular/core'; import { Subject, BehaviorSubject, Observable } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { retry, takeUntil, filter, share } from 'rxjs/operators'; export interface ChatMessage { type: string; from?: string; text?: string; timestamp?: string; online?: string[]; } @Injectable({ providedIn: 'root' }) export class ChatService implements OnDestroy { private destroy$ = new Subject<void>(); private socket$!: WebSocketSubject<ChatMessage>; messages$ = new Subject<ChatMessage>(); onlineUsers$ = new BehaviorSubject<string[]>([]); isConnected$ = new BehaviorSubject<boolean>(false); connect(username: string): void { const wsUrl = `ws://localhost:8000/ws/chat?username=${username}`; this.socket$ = webSocket({ url: wsUrl, openObserver: { next: () => this.isConnected$.next(true) }, closeObserver: { next: () => this.isConnected$.next(false) } }); this.socket$.pipe( retry({ count: 10, delay: 2000 }), takeUntil(this.destroy$) ).subscribe(msg => { this.messages$.next(msg); if (msg.online) this.onlineUsers$.next(msg.online); }); } sendMessage(text: string): void { this.socket$.next({ type: 'message', text }); } sendTyping(): void { this.socket$.next({ type: 'typing' }); } ngOnDestroy(): void { this.destroy$.next(); this.socket$.complete(); } }
@Component({ selector: 'app-chat', templateUrl: './chat.component.html' }) export class ChatComponent implements OnInit, OnDestroy { private chat = inject(ChatService); private destroy$ = new Subject<void>(); messages: ChatMessage[] = []; newMsg = ''; username = ''; joined = false; typingUser = ''; onlineUsers: string[] = []; isConnected$ = this.chat.isConnected$; onlineUsers$ = this.chat.onlineUsers$; ngOnInit(): void { this.chat.messages$.pipe( takeUntil(this.destroy$) ).subscribe(msg => { switch (msg.type) { case 'message': case 'system': this.messages.push(msg); break; case 'history': this.messages = (msg as any).messages; break; case 'typing': this.typingUser = msg.from!; setTimeout(() => this.typingUser = '', 2000); break; } }); } join(): void { if (!this.username.trim()) return; this.chat.connect(this.username); this.joined = true; } send(): void { if (!this.newMsg.trim()) return; this.chat.sendMessage(this.newMsg); this.newMsg = ''; } onTyping(): void { this.chat.sendTyping(); } ngOnDestroy(): void { this.destroy$.next(); } }
<div *ngIf="!joined"> <input [(ngModel)]="username" placeholder="Your name" /> <button (click)="join()">Join Chat</button> </div> <div *ngIf="joined"> <<!-- Connection status --> <span [class.green]="isConnected$ | async"> {{ (isConnected$ | async) ? '๐ข Connected' : '๐ด Reconnecting...' }} </span> <<!-- Online users --> <div>Online: {{ onlineUsers$ | async | json }}</div> <<!-- Messages --> <div *ngFor="let msg of messages"> <ng-container [ngSwitch]="msg.type"> <div *ngSwitchCase="'message'"> <strong>{{ msg.from }}:</strong> {{ msg.text }} <small>{{ msg.timestamp | date:'HH:mm' }}</small> </div> <div *ngSwitchCase="'system'" class="system-msg"> โน๏ธ {{ msg.text }} </div> </ng-container> </div> <<!-- Typing indicator --> <div *ngIf="typingUser">{{ typingUser }} is typing...</div> <<!-- Input --> <input [(ngModel)]="newMsg" (keyup.enter)="send()" (input)="onTyping()" placeholder="Type a message..." /> <button (click)="send()">Send</button> </div>
Server-Side Push
Notifications
Server pushes updates to clients โ no client polling required. Use for notifications, task completion, alerts.
from fastapi import APIRouter from fastapi.responses import StreamingResponse import asyncio, json router = APIRouter() # SSE = one-way push (server โ client only). Simpler than WebSocket. # Use for: notifications, progress bars, live logs, dashboards @router.get("/events") async def event_stream(request: Request, current_user = Depends(get_current_user)): async def generate(): try: while True: # Check if client disconnected if await request.is_disconnected(): break # Fetch pending notifications for this user notifications = await get_notifications(current_user.id) for notif in notifications: # SSE format: "data: {json}\n\n" yield f"data: {json.dumps(notif)}\n\n" await mark_sent(notif['id']) # Heartbeat every 30s to keep connection alive yield ": heartbeat\n\n" await asyncio.sleep(5) except asyncio.CancelledError: pass return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # disable Nginx buffering! "Connection": "keep-alive" } )
# Global registry: user_id โ websocket (for pushing to specific users) user_sockets: Dict[int, WebSocket] = {} @router.websocket("/ws/notifications") async def notification_ws(ws: WebSocket, user: User = Depends(get_ws_user)): await ws.accept() user_sockets[user.id] = ws try: while True: # Keep alive โ client can ping data = await ws.receive_text() if data == "ping": await ws.send_text("pong") except WebSocketDisconnect: user_sockets.pop(user.id, None) # Call this from ANY route to push to specific user async def push_to_user(user_id: int, notification: dict): ws = user_sockets.get(user_id) if ws: try: await ws.send_json(notification) except: user_sockets.pop(user_id, None) # Example: push notification when order is shipped @router.post("/orders/{order_id}/ship") async def ship_order(order_id: int, db: DBSession, admin: AdminOnly): order = ship_order_in_db(db, order_id) # Push notification to the customer in real-time! await push_to_user(order.user_id, { "type": "notification", "title": "Your order shipped! ๐", "order_id": order_id }) return order
import { Injectable } from '@angular/core'; import { Observable, fromEvent, Subject } from 'rxjs'; import { map, takeUntil } from 'rxjs/operators'; @Injectable({ providedIn: 'root' }) export class SseService { private eventSource?: EventSource; connect(url: string): Observable<any> { return new Observable(observer => { this.eventSource = new EventSource(url); this.eventSource.onmessage = (event) => { observer.next(JSON.parse(event.data)); }; this.eventSource.onerror = (err) => { // SSE auto-reconnects by default! console.error('SSE error, will retry...', err); }; // Cleanup when unsubscribed return () => this.eventSource?.close(); }); } disconnect(): void { this.eventSource?.close(); } } // Usage in component: // this.sseService.connect('http://localhost:8000/events') // .pipe(takeUntil(this.destroy$)) // .subscribe(notification => this.notifications.push(notification));
Live Dashboard &
ML Streaming
Stream ML predictions, live sensor data, or real-time analytics โ perfect for your ML learning combined with Angular.
from fastapi import WebSocket, WebSocketDisconnect import asyncio, numpy as np # Load model once at startup (not per-request!) model = load_ml_model() @router.websocket("/ws/predict") async def predict_stream(ws: WebSocket): await ws.accept() try: while True: # Receive features from Angular payload = await ws.receive_json() features = np.array(payload["features"]) # Run ML prediction (offload to thread) loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, model.predict, features.reshape(1, -1) ) await ws.send_json({ "prediction": result.tolist()[0], "confidence": float(result.max()), "request_id": payload.get("request_id") }) except WebSocketDisconnect: pass # Stream live data to dashboard every second @router.websocket("/ws/dashboard") async def dashboard_stream(ws: WebSocket): await ws.accept() try: while True: # Collect metrics from DB/Redis metrics = await asyncio.gather( get_active_users(), get_revenue_today(), get_error_rate(), get_prediction_accuracy() ) await ws.send_json({ "type": "metrics", "active_users": metrics[0], "revenue": metrics[1], "error_rate": metrics[2], "model_accuracy": metrics[3], "timestamp": datetime.utcnow().isoformat() }) await asyncio.sleep(1) # push every second except WebSocketDisconnect: pass
@Component({ selector: 'app-dashboard', templateUrl: './dashboard.component.html' }) export class DashboardComponent implements OnInit, OnDestroy { private ws = inject(WebSocketService); private destroy$ = new Subject<void>(); metrics = { activeUsers: 0, revenue: 0, errorRate: 0, accuracy: 0 }; history: any[] = []; // for chart ngOnInit(): void { this.ws.connect('ws://localhost:8000/ws/dashboard'); this.ws.messages$.pipe( takeUntil(this.destroy$) ).subscribe((msg: any) => { this.metrics = { activeUsers: msg.active_users, revenue: msg.revenue, errorRate: msg.error_rate, accuracy: msg.model_accuracy }; // Keep last 60 points for sparkline chart this.history.push({ t: new Date(msg.timestamp), ...msg }); if (this.history.length > 60) this.history.shift(); }); } // ML prediction on demand predict(features: number[]): void { this.ws.send({ type: 'predict', features, request_id: Date.now() }); } ngOnDestroy(): void { this.destroy$.next(); } }
Auth & JWT over
WebSocket
WebSockets can't send Authorization headers. Here's how to authenticate WebSocket connections properly.
# โโ METHOD 1: Token in query string (simple, ok for internal tools) โโ @router.websocket("/ws") async def ws_query_auth(ws: WebSocket, token: str, db: DBSession): user = verify_token(token, db) # validate BEFORE accept() if not user: await ws.close(code=1008) # 1008 = Policy Violation return await ws.accept() # ws URL: ws://localhost:8000/ws?token=eyJhbGci... # โโ METHOD 2: First message is auth token (most secure) โโ @router.websocket("/ws/secure") async def ws_msg_auth(ws: WebSocket, db: DBSession): await ws.accept() # accept connection first try: # Expect auth token as first message (within 5 seconds) auth_msg = await asyncio.wait_for(ws.receive_json(), timeout=5.0) if auth_msg.get("type") != "auth": await ws.send_json({"error": "First message must be auth"}) await ws.close() return user = verify_token(auth_msg["token"], db) if not user: await ws.send_json({"error": "Invalid token"}) await ws.close(1008) return await ws.send_json({"type": "auth_ok", "user": user.name}) # Now handle real messages while True: data = await ws.receive_json() # user is authenticated, process message except asyncio.TimeoutError: await ws.close(1008) except WebSocketDisconnect: pass
// Method 1 โ token in URL (not ideal, logs may capture it) const token = localStorage.getItem('access_token'); this.ws.connect(`ws://localhost:8000/ws?token=${token}`); // Method 2 โ token as first message (preferred) @Injectable({ providedIn: 'root' }) export class AuthWsService { private socket$!: WebSocketSubject<any>; connect(): void { this.socket$ = webSocket('ws://localhost:8000/ws/secure'); // Send auth as FIRST message immediately after open this.socket$.pipe( take(1) // wait for connection open ).subscribe(() => { this.socket$.next({ type: 'auth', token: this.authService.getToken() }); }); } }
Reconnection &
Error Handling
@Injectable({ providedIn: 'root' }) export class RobustWsService { private socket?: WebSocket; // native WebSocket for max control private reconnectAttempts = 0; private maxReconnect = 10; private pingInterval?: any; private messageQueue: any[] = []; messages$ = new Subject<any>(); status$ = new BehaviorSubject<'connected'|'disconnected'|'reconnecting'>('disconnected'); connect(url: string): void { this.url = url; this._connect(); } private _connect(): void { this.socket = new WebSocket(this.url); this.socket.onopen = () => { this.reconnectAttempts = 0; this.status$.next('connected'); // Flush queued messages while (this.messageQueue.length) { this.socket!.send(JSON.stringify(this.messageQueue.shift())); } // Start heartbeat ping every 30s this.pingInterval = setInterval(() => { if (this.socket?.readyState === WebSocket.OPEN) { this.socket.send('ping'); } }, 30000); }; this.socket.onmessage = ({data}) => { if (data === 'pong') return; // heartbeat response this.messages$.next(JSON.parse(data)); }; this.socket.onclose = (event) => { clearInterval(this.pingInterval); if (event.code === 1000) return; // intentional close, don't reconnect this._scheduleReconnect(); }; this.socket.onerror = () => { this.socket?.close(); }; } private _scheduleReconnect(): void { if (this.reconnectAttempts >= this.maxReconnect) { this.status$.next('disconnected'); console.error('Max reconnect attempts reached'); return; } this.status$.next('reconnecting'); // Exponential backoff: 1s, 2s, 4s, 8s, 16s... (max 30s) const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); this.reconnectAttempts++; setTimeout(() => this._connect(), delay); } send(msg: object): void { if (this.socket?.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify(msg)); } else { // Queue the message, send when reconnected this.messageQueue.push(msg); } } disconnect(): void { this.socket?.close(1000); // 1000 = Normal closure (won't trigger reconnect) } }
RxJS WebSocket โ
The Complete Pattern
Combine RxJS operators with WebSockets for powerful reactive real-time data pipelines.
import { webSocket } from 'rxjs/webSocket'; import { retry, debounceTime, throttleTime, distinctUntilChanged, bufferTime, scan, filter, map, share, catchError, EMPTY } from 'rxjs/operators'; const socket$ = webSocket('ws://localhost:8000/ws/dashboard').pipe( retry({ count: 5, delay: 2000 }), share() // IMPORTANT: share prevents multiple WS connections per subscription ); // โโ Pattern 1: Rate limit UI updates โโโโโโโโโโโโโโโโ // Server sends 100 updates/sec, but only re-render every 200ms const throttledMetrics$ = socket$.pipe( filter((m: any) => m.type === 'metric'), throttleTime(200) // only take first per 200ms window ); // โโ Pattern 2: Accumulate into array (for chart history) โ const chartData$ = socket$.pipe( filter((m: any) => m.type === 'metric'), scan((history: any[], msg) => { const next = [...history, msg]; return next.slice(-60); // keep last 60 points }, []) ); // โโ Pattern 3: Search with debounce via WS โโโโโโโโโโโโ // User types โ send to WS โ debounce โ get results searchQuery$ = new Subject<string>(); searchQuery$.pipe( debounceTime(300), // wait 300ms after user stops typing distinctUntilChanged(), // skip if same as last value tap(query => socket$.next({ type: 'search', query })) // send to WS ).subscribe(); const searchResults$ = socket$.pipe( filter((m: any) => m.type === 'search_results'), map((m: any) => m.results) ); // โโ Pattern 4: Group messages into batches โโโโโโโโโโโโ const batched$ = socket$.pipe( bufferTime(500), // collect all messages in 500ms windows filter(batch => batch.length > 0) // skip empty windows ); // โโ Pattern 5: Handle errors gracefully โโโโโโโโโโโโโโ const safe$ = socket$.pipe( catchError(err => { console.error('WS error:', err); return EMPTY; // complete gracefully }) );
- Always unsubscribe in ngOnDestroy using takeUntil(destroy$) or async pipe
- Use share() operator so multiple components don't open multiple connections
- Send structured messages with a "type" field โ makes routing easy
- Implement heartbeat (ping/pong) โ some proxies close idle connections after 60s
- Use wss:// (WebSocket Secure = WebSocket over TLS) in production
- Use exponential backoff for reconnection โ don't hammer the server