๐Ÿ“ก Start Here

HTTP vs WebSocket โ€”
The Core Difference

Know when to use WebSockets vs regular HTTP calls. Choose the wrong one and you'll either waste resources or miss real-time capabilities.

The Fundamental Difference
CONCEPT
HTTP is like sending a letter: you write it, send it, wait for a reply letter. Every message = a new envelope, stamp, and round trip. WebSocket is like a phone call: one connection opened, both sides talk freely, no new envelope each time.
๐ŸŒ HTTP โ€” Request/Response

Client always initiates. Server can only respond. Each request opens a new connection (or reuses one briefly). Works great for: fetching data on demand, form submissions, CRUD APIs. Angular: HttpClient.get(), post().

โšก WebSocket โ€” Full-Duplex

One connection, both sides can send at any time. Server can push to client WITHOUT the client asking. Works great for: chat, live scores, stock prices, multiplayer games, ML model streaming results, dashboards.

Use WebSocket when...
  • Server needs to PUSH data to client (not just respond)
  • Very frequent updates (more than once per 2 seconds)
  • Real-time collaboration (multiple users affecting same data)
  • Low latency matters (games, trading, auctions)
  • Long-running operations with progress (ML training, file uploads)
Use regular HTTP for: user profile fetching, login/register, form submissions, one-time data loads. Connecting WebSocket for these is wasteful overkill.
connection lifecycleconcept
// HTTP
Client: GET /users             โ†’ new TCP connection (or reuse)
Server:                       โ† sends response
-- connection may close --
Client: GET /users/1           โ†’ new request (could be same connection)
Server:                       โ† sends response

// WebSocket
Client: GET /ws (Upgrade: websocket)    โ†’ HTTP handshake
Server: 101 Switching Protocols        โ† upgrade confirmed
-- SINGLE CONNECTION STAYS OPEN --
Client: {"type": "join", "room": "chat"}  โ†’ send anytime
Server: {"type": "message", "text": "hi"} โ† push anytime
Server: {"type": "user_joined"}           โ† push anytime
Client: {"type": "message", "text": "yo"} โ†’ send anytime
-- connection closes when either side disconnects --
Backend

FastAPI WebSocket
Server

FastAPI has native async WebSocket support. No libraries needed beyond FastAPI itself.

WebSocket Endpoint โ€” From Simple to Full
FASTAPI
Simple Echo
Connection Manager
Rooms
app/routers/ws.pyPython
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
import json, logging

router = APIRouter()

# Simple echo server โ€” reflects back what client sends
@router.websocket("/ws/echo")
async def ws_echo(websocket: WebSocket):
    await websocket.accept()   # accept the connection
    try:
        while True:
            # Receive โ€” text, bytes, or JSON
            data = await websocket.receive_text()       # raw string
            # data = await websocket.receive_json()    # parse JSON
            # data = await websocket.receive_bytes()   # binary

            # Send โ€” text, bytes, or JSON
            await websocket.send_text(f"Echo: {data}")
            # await websocket.send_json({"echo": data})

    except WebSocketDisconnect:
        # Client disconnected โ€” normal, handle gracefully
        logging.info("Client disconnected")

# Path parameters work with WebSockets too!
@router.websocket("/ws/{client_id}")
async def ws_with_id(websocket: WebSocket, client_id: str):
    await websocket.accept()
    try:
        while True:
            msg = await websocket.receive_json()
            await websocket.send_json({
                "from": client_id,
                "data": msg,
                "timestamp": datetime.utcnow().isoformat()
            })
    except WebSocketDisconnect:
        logging.info(f"Client {client_id} disconnected")
app/ws_manager.py โ€” broadcast to all connected clientsPython
from fastapi import WebSocket
from typing import Dict
import asyncio

class ConnectionManager:
    """Manages all active WebSocket connections"""

    def __init__(self):
        self.active: Dict[str, WebSocket] = {}  # client_id โ†’ socket

    async def connect(self, ws: WebSocket, client_id: str):
        await ws.accept()
        self.active[client_id] = ws
        await self.broadcast({
            "type": "user_joined",
            "client_id": client_id,
            "online_count": len(self.active)
        }, exclude=client_id)

    def disconnect(self, client_id: str):
        self.active.pop(client_id, None)

    async def send_to(self, client_id: str, message: dict):
        """Send to specific client"""
        ws = self.active.get(client_id)
        if ws:
            await ws.send_json(message)

    async def broadcast(self, message: dict, exclude: str = None):
        """Send to ALL connected clients"""
        dead = []
        for cid, ws in self.active.items():
            if cid == exclude: continue
            try:
                await ws.send_json(message)
            except Exception:
                dead.append(cid)  # connection died silently
        for cid in dead:
            self.disconnect(cid)

# Singleton instance
manager = ConnectionManager()

@router.websocket("/ws/chat/{client_id}")
async def chat_ws(websocket: WebSocket, client_id: str):
    await manager.connect(websocket, client_id)
    try:
        while True:
            data = await websocket.receive_json()
            await manager.broadcast({
                "type": "message",
                "from": client_id,
                "text": data.get("text"),
                "timestamp": datetime.utcnow().isoformat()
            })
    except WebSocketDisconnect:
        manager.disconnect(client_id)
        await manager.broadcast({
            "type": "user_left",
            "client_id": client_id
        })
ws_manager.py โ€” roomsPython
from collections import defaultdict

class RoomManager:
    def __init__(self):
        # room_id โ†’ {client_id: websocket}
        self.rooms: Dict[str, Dict[str, WebSocket]] = defaultdict(dict)

    async def join(self, ws: WebSocket, room_id: str, client_id: str):
        await ws.accept()
        self.rooms[room_id][client_id] = ws
        await self.broadcast_to_room(room_id, {
            "type": "joined", "user": client_id,
            "members": list(self.rooms[room_id].keys())
        })

    async def leave(self, room_id: str, client_id: str):
        self.rooms[room_id].pop(client_id, None)
        if not self.rooms[room_id]:
            del self.rooms[room_id]  # empty room cleanup
        else:
            await self.broadcast_to_room(room_id, {
                "type": "left", "user": client_id
            })

    async def broadcast_to_room(self, room_id: str, message: dict):
        for ws in self.rooms.get(room_id, {}).values():
            try:
                await ws.send_json(message)
            except: pass

room_manager = RoomManager()

@router.websocket("/ws/room/{room_id}/{client_id}")
async def room_ws(ws: WebSocket, room_id: str, client_id: str):
    await room_manager.join(ws, room_id, client_id)
    try:
        while True:
            data = await ws.receive_json()
            await room_manager.broadcast_to_room(room_id, {
                "type": "message", "from": client_id,
                "text": data.get("text")
            })
    except WebSocketDisconnect:
        await room_manager.leave(room_id, client_id)
Frontend

Angular WebSocket
Service

The right way to use WebSockets in Angular โ€” as an injectable service with RxJS Observables, so any component can subscribe to messages.

WebSocket Service โ€” Core Architecture
ANGULAR
WebSocket Service
Component Usage
RxJS Pattern
websocket.service.tsTypeScript
import { Injectable, OnDestroy } from '@angular/core';
import { Subject, Observable, timer, EMPTY } from 'rxjs';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import { switchMap, retryWhen, delayWhen, tap, catchError } from 'rxjs/operators';

export interface WsMessage {
  type: string;
  [key: string]: any;
}

@Injectable({ providedIn: 'root' })
export class WebSocketService implements OnDestroy {
  private socket$!: WebSocketSubject<WsMessage>;
  private messagesSubject$ = new Subject<WsMessage>();

  // Observable that components subscribe to
  messages$ = this.messagesSubject$.asObservable();

  connect(url: string): void {
    if (this.socket$ && !this.socket$.closed) return;

    this.socket$ = webSocket({
      url,
      openObserver: {
        next: () => console.log('๐Ÿ”— WebSocket connected')
      },
      closeObserver: {
        next: () => console.log('๐Ÿ”Œ WebSocket disconnected')
      }
    });

    this.socket$.pipe(
      // Auto-reconnect on error (retryWhen deprecated โ†’ use retry in RxJS 7+)
      retryWhen(errors =>
        errors.pipe(
          tap(() => console.log('โš ๏ธ WS error, reconnecting in 2s...')),
          delayWhen(() => timer(2000))
        )
      )
    ).subscribe({
      next: (msg) => this.messagesSubject$.next(msg),
      error: (err) => console.error('WS error:', err),
    });
  }

  send(message: WsMessage): void {
    if (this.socket$ && !this.socket$.closed) {
      this.socket$.next(message);
    } else {
      console.warn('WebSocket not connected');
    }
  }

  disconnect(): void {
    this.socket$?.complete();
  }

  // Filter messages by type โ€” returns Observable for that type only
  on<T>(type: string): Observable<T> {
    return this.messages$.pipe(
      filter(msg => msg.type === type)
    ) as Observable<T>;
  }

  ngOnDestroy(): void {
    this.disconnect();
  }
}
chat.component.tsTypeScript
import { Component, OnInit, OnDestroy, inject } from '@angular/core';
import { Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { WebSocketService } from '../services/websocket.service';

@Component({
  selector: 'app-chat',
  template: `
    <div *ngFor="let msg of messages">
      <strong>{{ msg.from }}:</strong> {{ msg.text }}
    </div>
    <input [(ngModel)]="newMessage" (keyup.enter)="send()" />
    <button (click)="send()">Send</button>
    <span>{{ onlineCount }} online</span>
  `
})
export class ChatComponent implements OnInit, OnDestroy {
  private ws = inject(WebSocketService);
  private destroy$ = new Subject<void>();

  messages: any[] = [];
  newMessage = '';
  onlineCount = 0;
  clientId = crypto.randomUUID().slice(0, 8);

  ngOnInit(): void {
    // Connect to WebSocket
    this.ws.connect(`ws://localhost:8000/ws/chat/${this.clientId}`);

    // Listen for chat messages
    this.ws.on('message').pipe(
      takeUntil(this.destroy$)
    ).subscribe(msg => {
      this.messages.push(msg);
    });

    // Listen for user joins
    this.ws.on('user_joined').pipe(
      takeUntil(this.destroy$)
    ).subscribe(msg => {
      this.onlineCount = msg.online_count;
    });

    // Or subscribe to ALL messages
    this.ws.messages$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(msg => {
      switch (msg.type) {
        case 'message': this.messages.push(msg); break;
        case 'user_joined': this.onlineCount = msg.online_count; break;
        case 'user_left': this.onlineCount--; break;
      }
    });
  }

  send(): void {
    if (!this.newMessage.trim()) return;
    this.ws.send({ type: 'message', text: this.newMessage });
    this.newMessage = '';
  }

  ngOnDestroy(): void {
    this.destroy$.next();    // unsubscribe from all observables
    this.destroy$.complete();
    this.ws.disconnect();
  }
}
websocket.service.ts โ€” full RxJS patternTypeScript
import { webSocket } from 'rxjs/webSocket'; import { retry, delay, share, filter, map } from 'rxjs/operators'; // Robust production-grade WS service with RxJS @Injectable({ providedIn: 'root' }) export class WsService { private readonly WS_URL = environment.wsUrl; // from environment.ts private socket$ = webSocket({ url: this.WS_URL, serializer: (msg) => JSON.stringify(msg), deserializer: ({data}) => JSON.parse(data), }).pipe( retry({ count: 5, delay: 2000 }), // retry 5 times, 2s apart share() // ONE shared connection, even if many components subscribe ); // Type-safe message filtering chatMessages$ = this.socket$.pipe( filter((m: any) => m.type === 'message'), map((m: any) => m as ChatMessage) ); notifications$ = this.socket$.pipe( filter((m: any) => m.type === 'notification') ); presence$ = this.socket$.pipe( filter((m: any) => ['user_joined', 'user_left'].includes(m.type)) ); send(msg: object): void { (this.socket$ as any).next(msg); } } // environment.ts export const environment = { production: false, apiUrl: 'http://localhost:8000', wsUrl: 'ws://localhost:8000/ws' // wss:// for production };
Full App

Live Chat App โ€”
End to End

Complete working chat: Angular frontend + FastAPI backend. Every file you need.

Complete Chat โ€” Backend + Frontend
FULLSTACK
FastAPI Backend
Angular Service
Angular Component
HTML Template
app/routers/chat.pyPython
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from datetime import datetime
import json

router = APIRouter()

# โ”€โ”€ Connection Manager โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class ChatManager:
    def __init__(self):
        self.connections: dict[str, WebSocket] = {}
        self.message_history: list = []  # last 50 messages

    async def connect(self, ws: WebSocket, username: str):
        await ws.accept()
        self.connections[username] = ws
        # Send history to new joiner
        await ws.send_json({
            "type": "history",
            "messages": self.message_history[-50:]
        })
        # Announce to everyone
        await self.broadcast({
            "type": "system",
            "text": f"{username} joined",
            "online": list(self.connections.keys())
        })

    def disconnect(self, username: str):
        self.connections.pop(username, None)

    async def broadcast(self, msg: dict):
        msg["timestamp"] = datetime.utcnow().isoformat()
        if msg.get("type") == "message":
            self.message_history.append(msg)
        stale = []
        for uname, ws in self.connections.items():
            try:
                await ws.send_json(msg)
            except:
                stale.append(uname)
        for u in stale: self.disconnect(u)

chat = ChatManager()

@router.websocket("/ws/chat")
async def chat_endpoint(ws: WebSocket, username: str):
    await chat.connect(ws, username)
    try:
        while True:
            data = await ws.receive_json()
            action = data.get("type")

            if action == "message":
                await chat.broadcast({
                    "type": "message",
                    "from": username,
                    "text": data.get("text", "")[:500]  # limit length
                })

            elif action == "typing":
                await chat.broadcast({
                    "type": "typing", "user": username
                })

    except WebSocketDisconnect:
        chat.disconnect(username)
        await chat.broadcast({
            "type": "system",
            "text": f"{username} left",
            "online": list(chat.connections.keys())
        })
chat.service.tsTypeScript
import { Injectable, OnDestroy } from '@angular/core'; import { Subject, BehaviorSubject, Observable } from 'rxjs'; import { webSocket, WebSocketSubject } from 'rxjs/webSocket'; import { retry, takeUntil, filter, share } from 'rxjs/operators'; export interface ChatMessage { type: string; from?: string; text?: string; timestamp?: string; online?: string[]; } @Injectable({ providedIn: 'root' }) export class ChatService implements OnDestroy { private destroy$ = new Subject<void>(); private socket$!: WebSocketSubject<ChatMessage>; messages$ = new Subject<ChatMessage>(); onlineUsers$ = new BehaviorSubject<string[]>([]); isConnected$ = new BehaviorSubject<boolean>(false); connect(username: string): void { const wsUrl = `ws://localhost:8000/ws/chat?username=${username}`; this.socket$ = webSocket({ url: wsUrl, openObserver: { next: () => this.isConnected$.next(true) }, closeObserver: { next: () => this.isConnected$.next(false) } }); this.socket$.pipe( retry({ count: 10, delay: 2000 }), takeUntil(this.destroy$) ).subscribe(msg => { this.messages$.next(msg); if (msg.online) this.onlineUsers$.next(msg.online); }); } sendMessage(text: string): void { this.socket$.next({ type: 'message', text }); } sendTyping(): void { this.socket$.next({ type: 'typing' }); } ngOnDestroy(): void { this.destroy$.next(); this.socket$.complete(); } }
chat.component.tsTypeScript
@Component({ selector: 'app-chat', templateUrl: './chat.component.html' })
export class ChatComponent implements OnInit, OnDestroy {
  private chat = inject(ChatService);
  private destroy$ = new Subject<void>();

  messages: ChatMessage[] = [];
  newMsg = '';
  username = '';
  joined = false;
  typingUser = '';
  onlineUsers: string[] = [];
  isConnected$ = this.chat.isConnected$;
  onlineUsers$ = this.chat.onlineUsers$;

  ngOnInit(): void {
    this.chat.messages$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(msg => {
      switch (msg.type) {
        case 'message':
        case 'system':
          this.messages.push(msg);
          break;
        case 'history':
          this.messages = (msg as any).messages;
          break;
        case 'typing':
          this.typingUser = msg.from!;
          setTimeout(() => this.typingUser = '', 2000);
          break;
      }
    });
  }

  join(): void {
    if (!this.username.trim()) return;
    this.chat.connect(this.username);
    this.joined = true;
  }

  send(): void {
    if (!this.newMsg.trim()) return;
    this.chat.sendMessage(this.newMsg);
    this.newMsg = '';
  }

  onTyping(): void { this.chat.sendTyping(); }

  ngOnDestroy(): void { this.destroy$.next(); }
}
chat.component.htmlAngular Template
<div *ngIf="!joined"> <input [(ngModel)]="username" placeholder="Your name" /> <button (click)="join()">Join Chat</button> </div> <div *ngIf="joined"> <<!-- Connection status --> <span [class.green]="isConnected$ | async"> {{ (isConnected$ | async) ? '๐ŸŸข Connected' : '๐Ÿ”ด Reconnecting...' }} </span> <<!-- Online users --> <div>Online: {{ onlineUsers$ | async | json }}</div> <<!-- Messages --> <div *ngFor="let msg of messages"> <ng-container [ngSwitch]="msg.type"> <div *ngSwitchCase="'message'"> <strong>{{ msg.from }}:</strong> {{ msg.text }} <small>{{ msg.timestamp | date:'HH:mm' }}</small> </div> <div *ngSwitchCase="'system'" class="system-msg"> โ„น๏ธ {{ msg.text }} </div> </ng-container> </div> <<!-- Typing indicator --> <div *ngIf="typingUser">{{ typingUser }} is typing...</div> <<!-- Input --> <input [(ngModel)]="newMsg" (keyup.enter)="send()" (input)="onTyping()" placeholder="Type a message..." /> <button (click)="send()">Send</button> </div>
Pattern

Server-Side Push
Notifications

Server pushes updates to clients โ€” no client polling required. Use for notifications, task completion, alerts.

SSE (simpler) + WebSocket Notifications
PUSH
FastAPI SSE
FastAPI WS Notify
Angular SSE
app/routers/sse.py โ€” Server-Sent Events (one-way push)Python
from fastapi import APIRouter from fastapi.responses import StreamingResponse import asyncio, json router = APIRouter() # SSE = one-way push (server โ†’ client only). Simpler than WebSocket. # Use for: notifications, progress bars, live logs, dashboards @router.get("/events") async def event_stream(request: Request, current_user = Depends(get_current_user)): async def generate(): try: while True: # Check if client disconnected if await request.is_disconnected(): break # Fetch pending notifications for this user notifications = await get_notifications(current_user.id) for notif in notifications: # SSE format: "data: {json}\n\n" yield f"data: {json.dumps(notif)}\n\n" await mark_sent(notif['id']) # Heartbeat every 30s to keep connection alive yield ": heartbeat\n\n" await asyncio.sleep(5) except asyncio.CancelledError: pass return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", # disable Nginx buffering! "Connection": "keep-alive" } )
ws_notify.py โ€” push notifications via WebSocketPython
# Global registry: user_id โ†’ websocket (for pushing to specific users) user_sockets: Dict[int, WebSocket] = {} @router.websocket("/ws/notifications") async def notification_ws(ws: WebSocket, user: User = Depends(get_ws_user)): await ws.accept() user_sockets[user.id] = ws try: while True: # Keep alive โ€” client can ping data = await ws.receive_text() if data == "ping": await ws.send_text("pong") except WebSocketDisconnect: user_sockets.pop(user.id, None) # Call this from ANY route to push to specific user async def push_to_user(user_id: int, notification: dict): ws = user_sockets.get(user_id) if ws: try: await ws.send_json(notification) except: user_sockets.pop(user_id, None) # Example: push notification when order is shipped @router.post("/orders/{order_id}/ship") async def ship_order(order_id: int, db: DBSession, admin: AdminOnly): order = ship_order_in_db(db, order_id) # Push notification to the customer in real-time! await push_to_user(order.user_id, { "type": "notification", "title": "Your order shipped! ๐Ÿšš", "order_id": order_id }) return order
sse.service.ts โ€” Angular SSE clientTypeScript
import { Injectable } from '@angular/core'; import { Observable, fromEvent, Subject } from 'rxjs'; import { map, takeUntil } from 'rxjs/operators'; @Injectable({ providedIn: 'root' }) export class SseService { private eventSource?: EventSource; connect(url: string): Observable<any> { return new Observable(observer => { this.eventSource = new EventSource(url); this.eventSource.onmessage = (event) => { observer.next(JSON.parse(event.data)); }; this.eventSource.onerror = (err) => { // SSE auto-reconnects by default! console.error('SSE error, will retry...', err); }; // Cleanup when unsubscribed return () => this.eventSource?.close(); }); } disconnect(): void { this.eventSource?.close(); } } // Usage in component: // this.sseService.connect('http://localhost:8000/events') // .pipe(takeUntil(this.destroy$)) // .subscribe(notification => this.notifications.push(notification));
ML + Live Data

Live Dashboard &
ML Streaming

Stream ML predictions, live sensor data, or real-time analytics โ€” perfect for your ML learning combined with Angular.

Stream ML Predictions in Real-Time
ML + WS
FastAPI ML Stream
Angular Dashboard
app/routers/ml_stream.pyPython
from fastapi import WebSocket, WebSocketDisconnect import asyncio, numpy as np # Load model once at startup (not per-request!) model = load_ml_model() @router.websocket("/ws/predict") async def predict_stream(ws: WebSocket): await ws.accept() try: while True: # Receive features from Angular payload = await ws.receive_json() features = np.array(payload["features"]) # Run ML prediction (offload to thread) loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, model.predict, features.reshape(1, -1) ) await ws.send_json({ "prediction": result.tolist()[0], "confidence": float(result.max()), "request_id": payload.get("request_id") }) except WebSocketDisconnect: pass # Stream live data to dashboard every second @router.websocket("/ws/dashboard") async def dashboard_stream(ws: WebSocket): await ws.accept() try: while True: # Collect metrics from DB/Redis metrics = await asyncio.gather( get_active_users(), get_revenue_today(), get_error_rate(), get_prediction_accuracy() ) await ws.send_json({ "type": "metrics", "active_users": metrics[0], "revenue": metrics[1], "error_rate": metrics[2], "model_accuracy": metrics[3], "timestamp": datetime.utcnow().isoformat() }) await asyncio.sleep(1) # push every second except WebSocketDisconnect: pass
dashboard.component.tsTypeScript
@Component({ selector: 'app-dashboard', templateUrl: './dashboard.component.html' }) export class DashboardComponent implements OnInit, OnDestroy { private ws = inject(WebSocketService); private destroy$ = new Subject<void>(); metrics = { activeUsers: 0, revenue: 0, errorRate: 0, accuracy: 0 }; history: any[] = []; // for chart ngOnInit(): void { this.ws.connect('ws://localhost:8000/ws/dashboard'); this.ws.messages$.pipe( takeUntil(this.destroy$) ).subscribe((msg: any) => { this.metrics = { activeUsers: msg.active_users, revenue: msg.revenue, errorRate: msg.error_rate, accuracy: msg.model_accuracy }; // Keep last 60 points for sparkline chart this.history.push({ t: new Date(msg.timestamp), ...msg }); if (this.history.length > 60) this.history.shift(); }); } // ML prediction on demand predict(features: number[]): void { this.ws.send({ type: 'predict', features, request_id: Date.now() }); } ngOnDestroy(): void { this.destroy$.next(); } }
Security

Auth & JWT over
WebSocket

WebSockets can't send Authorization headers. Here's how to authenticate WebSocket connections properly.

JWT Authentication for WebSocket Connections
AUTH
WebSocket's connection request doesn't support custom headers. So you can't send "Authorization: Bearer xxx". Solutions: 1) Send token as query param 2) Send token as first message 3) Use cookie. Option 2 is most secure for production.
auth_ws.py โ€” 3 methodsPython
# โ”€โ”€ METHOD 1: Token in query string (simple, ok for internal tools) โ”€โ”€
@router.websocket("/ws")
async def ws_query_auth(ws: WebSocket, token: str, db: DBSession):
    user = verify_token(token, db)  # validate BEFORE accept()
    if not user:
        await ws.close(code=1008)  # 1008 = Policy Violation
        return
    await ws.accept()
    # ws URL: ws://localhost:8000/ws?token=eyJhbGci...

# โ”€โ”€ METHOD 2: First message is auth token (most secure) โ”€โ”€
@router.websocket("/ws/secure")
async def ws_msg_auth(ws: WebSocket, db: DBSession):
    await ws.accept()  # accept connection first
    try:
        # Expect auth token as first message (within 5 seconds)
        auth_msg = await asyncio.wait_for(ws.receive_json(), timeout=5.0)
        if auth_msg.get("type") != "auth":
            await ws.send_json({"error": "First message must be auth"})
            await ws.close()
            return

        user = verify_token(auth_msg["token"], db)
        if not user:
            await ws.send_json({"error": "Invalid token"})
            await ws.close(1008)
            return

        await ws.send_json({"type": "auth_ok", "user": user.name})

        # Now handle real messages
        while True:
            data = await ws.receive_json()
            # user is authenticated, process message

    except asyncio.TimeoutError:
        await ws.close(1008)
    except WebSocketDisconnect:
        pass
Angular โ€” authenticate WebSocket connectionTypeScript
// Method 1 โ€” token in URL (not ideal, logs may capture it) const token = localStorage.getItem('access_token'); this.ws.connect(`ws://localhost:8000/ws?token=${token}`); // Method 2 โ€” token as first message (preferred) @Injectable({ providedIn: 'root' }) export class AuthWsService { private socket$!: WebSocketSubject<any>; connect(): void { this.socket$ = webSocket('ws://localhost:8000/ws/secure'); // Send auth as FIRST message immediately after open this.socket$.pipe( take(1) // wait for connection open ).subscribe(() => { this.socket$.next({ type: 'auth', token: this.authService.getToken() }); }); } }
Reliability

Reconnection &
Error Handling

Production-Ready Auto-Reconnect Service
RECONNECT
robust-ws.service.tsTypeScript
@Injectable({ providedIn: 'root' }) export class RobustWsService { private socket?: WebSocket; // native WebSocket for max control private reconnectAttempts = 0; private maxReconnect = 10; private pingInterval?: any; private messageQueue: any[] = []; messages$ = new Subject<any>(); status$ = new BehaviorSubject<'connected'|'disconnected'|'reconnecting'>('disconnected'); connect(url: string): void { this.url = url; this._connect(); } private _connect(): void { this.socket = new WebSocket(this.url); this.socket.onopen = () => { this.reconnectAttempts = 0; this.status$.next('connected'); // Flush queued messages while (this.messageQueue.length) { this.socket!.send(JSON.stringify(this.messageQueue.shift())); } // Start heartbeat ping every 30s this.pingInterval = setInterval(() => { if (this.socket?.readyState === WebSocket.OPEN) { this.socket.send('ping'); } }, 30000); }; this.socket.onmessage = ({data}) => { if (data === 'pong') return; // heartbeat response this.messages$.next(JSON.parse(data)); }; this.socket.onclose = (event) => { clearInterval(this.pingInterval); if (event.code === 1000) return; // intentional close, don't reconnect this._scheduleReconnect(); }; this.socket.onerror = () => { this.socket?.close(); }; } private _scheduleReconnect(): void { if (this.reconnectAttempts >= this.maxReconnect) { this.status$.next('disconnected'); console.error('Max reconnect attempts reached'); return; } this.status$.next('reconnecting'); // Exponential backoff: 1s, 2s, 4s, 8s, 16s... (max 30s) const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000); this.reconnectAttempts++; setTimeout(() => this._connect(), delay); } send(msg: object): void { if (this.socket?.readyState === WebSocket.OPEN) { this.socket.send(JSON.stringify(msg)); } else { // Queue the message, send when reconnected this.messageQueue.push(msg); } } disconnect(): void { this.socket?.close(1000); // 1000 = Normal closure (won't trigger reconnect) } }
RxJS

RxJS WebSocket โ€”
The Complete Pattern

Combine RxJS operators with WebSockets for powerful reactive real-time data pipelines.

RxJS webSocket() โ€” Power Patterns
RxJS
rxjs-ws-patterns.tsTypeScript
import { webSocket } from 'rxjs/webSocket'; import { retry, debounceTime, throttleTime, distinctUntilChanged, bufferTime, scan, filter, map, share, catchError, EMPTY } from 'rxjs/operators'; const socket$ = webSocket('ws://localhost:8000/ws/dashboard').pipe( retry({ count: 5, delay: 2000 }), share() // IMPORTANT: share prevents multiple WS connections per subscription ); // โ”€โ”€ Pattern 1: Rate limit UI updates โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ // Server sends 100 updates/sec, but only re-render every 200ms const throttledMetrics$ = socket$.pipe( filter((m: any) => m.type === 'metric'), throttleTime(200) // only take first per 200ms window ); // โ”€โ”€ Pattern 2: Accumulate into array (for chart history) โ”€ const chartData$ = socket$.pipe( filter((m: any) => m.type === 'metric'), scan((history: any[], msg) => { const next = [...history, msg]; return next.slice(-60); // keep last 60 points }, []) ); // โ”€โ”€ Pattern 3: Search with debounce via WS โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ // User types โ†’ send to WS โ†’ debounce โ†’ get results searchQuery$ = new Subject<string>(); searchQuery$.pipe( debounceTime(300), // wait 300ms after user stops typing distinctUntilChanged(), // skip if same as last value tap(query => socket$.next({ type: 'search', query })) // send to WS ).subscribe(); const searchResults$ = socket$.pipe( filter((m: any) => m.type === 'search_results'), map((m: any) => m.results) ); // โ”€โ”€ Pattern 4: Group messages into batches โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ const batched$ = socket$.pipe( bufferTime(500), // collect all messages in 500ms windows filter(batch => batch.length > 0) // skip empty windows ); // โ”€โ”€ Pattern 5: Handle errors gracefully โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ const safe$ = socket$.pipe( catchError(err => { console.error('WS error:', err); return EMPTY; // complete gracefully }) );
WebSocket Best Practices
  • Always unsubscribe in ngOnDestroy using takeUntil(destroy$) or async pipe
  • Use share() operator so multiple components don't open multiple connections
  • Send structured messages with a "type" field โ€” makes routing easy
  • Implement heartbeat (ping/pong) โ€” some proxies close idle connections after 60s
  • Use wss:// (WebSocket Secure = WebSocket over TLS) in production
  • Use exponential backoff for reconnection โ€” don't hammer the server