Skip to content

Real-time Communication

Introduction to Real-time Communication

Real-time communication enables instantaneous data exchange between mobile clients and servers, providing users with immediate updates and interactive experiences. This includes chat messaging, live updates, collaborative editing, gaming, and live streaming features.

System Architecture

Connection Management

Message Processing Flow

Performance Optimization

Core Technologies and Protocols

WebSocket Implementation

Native iOS WebSocket

swift
import Foundation
import Network

class WebSocketManager: NSObject {
    private var webSocketTask: URLSessionWebSocketTask?
    private var urlSession: URLSession?
    private let connectionQueue = DispatchQueue(label: "websocket.connection")
    
    private var isConnected = false
    private var reconnectAttempts = 0
    private let maxReconnectAttempts = 5
    private var reconnectTimer: Timer?
    
    weak var delegate: WebSocketManagerDelegate?
    
    func connect(to url: URL) {
        connectionQueue.async { [weak self] in
            self?.establishConnection(to: url)
        }
    }
    
    private func establishConnection(to url: URL) {
        let config = URLSessionConfiguration.default
        config.timeoutIntervalForRequest = 30
        config.timeoutIntervalForResource = 30
        
        urlSession = URLSession(configuration: config, delegate: self, delegateQueue: nil)
        webSocketTask = urlSession?.webSocketTask(with: url)
        
        webSocketTask?.resume()
        listen()
        
        // Send ping to keep connection alive
        scheduleHeartbeat()
    }
    
    private func listen() {
        webSocketTask?.receive { [weak self] result in
            switch result {
            case .success(let message):
                self?.handleMessage(message)
                self?.listen() // Continue listening
                
            case .failure(let error):
                self?.handleError(error)
            }
        }
    }
    
    private func handleMessage(_ message: URLSessionWebSocketTask.Message) {
        switch message {
        case .string(let text):
            DispatchQueue.main.async { [weak self] in
                self?.delegate?.didReceiveMessage(text)
            }
            
        case .data(let data):
            if let text = String(data: data, encoding: .utf8) {
                DispatchQueue.main.async { [weak self] in
                    self?.delegate?.didReceiveMessage(text)
                }
            }
            
        @unknown default:
            break
        }
    }
    
    func sendMessage(_ message: String) {
        let webSocketMessage = URLSessionWebSocketTask.Message.string(message)
        webSocketTask?.send(webSocketMessage) { [weak self] error in
            if let error = error {
                print("WebSocket send error: \(error)")
                self?.delegate?.didEncounterError(error)
            }
        }
    }
    
    func sendData(_ data: Data) {
        let webSocketMessage = URLSessionWebSocketTask.Message.data(data)
        webSocketTask?.send(webSocketMessage) { [weak self] error in
            if let error = error {
                print("WebSocket send error: \(error)")
                self?.delegate?.didEncounterError(error)
            }
        }
    }
    
    private func scheduleHeartbeat() {
        DispatchQueue.main.async { [weak self] in
            self?.reconnectTimer?.invalidate()
            self?.reconnectTimer = Timer.scheduledTimer(withTimeInterval: 30.0, repeats: true) { _ in
                self?.sendPing()
            }
        }
    }
    
    private func sendPing() {
        webSocketTask?.sendPing { [weak self] error in
            if let error = error {
                print("WebSocket ping failed: \(error)")
                self?.handleError(error)
            }
        }
    }
    
    private func handleError(_ error: Error) {
        print("WebSocket error: \(error)")
        isConnected = false
        
        DispatchQueue.main.async { [weak self] in
            self?.delegate?.didEncounterError(error)
        }
        
        attemptReconnection()
    }
    
    private func attemptReconnection() {
        guard reconnectAttempts < maxReconnectAttempts else {
            print("Max reconnection attempts reached")
            return
        }
        
        reconnectAttempts += 1
        let delay = TimeInterval(min(30, pow(2.0, Double(reconnectAttempts))))
        
        DispatchQueue.main.asyncAfter(deadline: .now() + delay) { [weak self] in
            guard let self = self,
                  let url = self.webSocketTask?.originalRequest?.url else { return }
            
            print("Attempting reconnection #\(self.reconnectAttempts)")
            self.connect(to: url)
        }
    }
    
    func disconnect() {
        reconnectTimer?.invalidate()
        webSocketTask?.cancel(with: .goingAway, reason: nil)
        urlSession?.invalidateAndCancel()
        isConnected = false
        reconnectAttempts = 0
    }
}

extension WebSocketManager: URLSessionWebSocketDelegate {
    func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
        print("WebSocket connected")
        isConnected = true
        reconnectAttempts = 0
        
        DispatchQueue.main.async { [weak self] in
            self?.delegate?.didConnect()
        }
    }
    
    func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
        print("WebSocket disconnected with code: \(closeCode)")
        isConnected = false
        
        DispatchQueue.main.async { [weak self] in
            self?.delegate?.didDisconnect()
        }
        
        if closeCode != .goingAway {
            attemptReconnection()
        }
    }
}

protocol WebSocketManagerDelegate: AnyObject {
    func didConnect()
    func didDisconnect()
    func didReceiveMessage(_ message: String)
    func didEncounterError(_ error: Error)
}

Android WebSocket with OkHttp

kotlin
import okhttp3.*
import okio.ByteString
import org.json.JSONObject
import java.util.concurrent.TimeUnit

class WebSocketManager(private val listener: WebSocketListener) {
    private var webSocket: WebSocket? = null
    private val client = OkHttpClient.Builder()
        .readTimeout(30, TimeUnit.SECONDS)
        .writeTimeout(30, TimeUnit.SECONDS)
        .pingInterval(30, TimeUnit.SECONDS)
        .build()
    
    private var isConnected = false
    private var reconnectAttempts = 0
    private val maxReconnectAttempts = 5
    private var reconnectHandler = android.os.Handler(android.os.Looper.getMainLooper())
    
    interface WebSocketListener {
        fun onConnected()
        fun onDisconnected()
        fun onMessageReceived(message: String)
        fun onError(error: Throwable)
    }
    
    fun connect(url: String) {
        val request = Request.Builder()
            .url(url)
            .addHeader("Authorization", "Bearer ${getAuthToken()}")
            .build()
        
        webSocket = client.newWebSocket(request, object : okhttp3.WebSocketListener() {
            override fun onOpen(webSocket: WebSocket, response: Response) {
                isConnected = true
                reconnectAttempts = 0
                listener.onConnected()
            }
            
            override fun onMessage(webSocket: WebSocket, text: String) {
                listener.onMessageReceived(text)
            }
            
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                listener.onMessageReceived(bytes.utf8())
            }
            
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                isConnected = false
                listener.onDisconnected()
                
                if (code != 1000) { // Not normal closure
                    attemptReconnection(url)
                }
            }
            
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                isConnected = false
                listener.onError(t)
                attemptReconnection(url)
            }
        })
    }
    
    fun sendMessage(message: String): Boolean {
        return webSocket?.send(message) ?: false
    }
    
    fun sendMessage(data: ByteArray): Boolean {
        return webSocket?.send(ByteString.of(*data)) ?: false
    }
    
    private fun attemptReconnection(url: String) {
        if (reconnectAttempts >= maxReconnectAttempts) {
            listener.onError(Exception("Max reconnection attempts reached"))
            return
        }
        
        reconnectAttempts++
        val delay = (kotlin.math.min(30, kotlin.math.pow(2.0, reconnectAttempts.toDouble())) * 1000).toLong()
        
        reconnectHandler.postDelayed({
            connect(url)
        }, delay)
    }
    
    fun disconnect() {
        webSocket?.close(1000, "User disconnect")
        isConnected = false
        reconnectAttempts = 0
    }
    
    private fun getAuthToken(): String {
        // Return your authentication token
        return "your_auth_token"
    }
}

// Usage
class ChatActivity : AppCompatActivity(), WebSocketManager.WebSocketListener {
    private lateinit var webSocketManager: WebSocketManager
    
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        
        webSocketManager = WebSocketManager(this)
        webSocketManager.connect("wss://api.yourapp.com/chat")
    }
    
    override fun onConnected() {
        runOnUiThread {
            // Update UI to show connected state
            updateConnectionStatus(true)
        }
    }
    
    override fun onDisconnected() {
        runOnUiThread {
            // Update UI to show disconnected state
            updateConnectionStatus(false)
        }
    }
    
    override fun onMessageReceived(message: String) {
        runOnUiThread {
            try {
                val messageObj = JSONObject(message)
                when (messageObj.getString("type")) {
                    "chat_message" -> handleChatMessage(messageObj)
                    "user_typing" -> handleTypingIndicator(messageObj)
                    "presence_update" -> handlePresenceUpdate(messageObj)
                }
            } catch (e: Exception) {
                onError(e)
            }
        }
    }
    
    override fun onError(error: Throwable) {
        runOnUiThread {
            // Show error message to user
            showErrorMessage(error.message ?: "Connection error")
        }
    }
    
    private fun sendChatMessage(text: String) {
        val message = JSONObject().apply {
            put("type", "chat_message")
            put("text", text)
            put("timestamp", System.currentTimeMillis())
            put("user_id", getCurrentUserId())
        }
        
        webSocketManager.sendMessage(message.toString())
    }
}

React Native Real-time Implementation

Using Socket.IO Client

typescript
import io, { Socket } from 'socket.io-client';
import { NetInfo } from '@react-native-async-storage/async-storage';

class RealTimeManager {
  private socket: Socket | null = null;
  private connectionOptions: any;
  private listeners: Map<string, Function[]> = new Map();
  private isConnected = false;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  constructor(url: string, options: any = {}) {
    this.connectionOptions = {
      transports: ['websocket'],
      autoConnect: false,
      reconnection: true,
      reconnectionAttempts: this.maxReconnectAttempts,
      reconnectionDelay: 1000,
      timeout: 20000,
      ...options,
    };

    this.socket = io(url, this.connectionOptions);
    this.setupEventHandlers();
    this.setupNetworkListener();
  }

  private setupEventHandlers(): void {
    if (!this.socket) return;

    this.socket.on('connect', () => {
      console.log('Socket connected');
      this.isConnected = true;
      this.reconnectAttempts = 0;
      this.emit('connected');
    });

    this.socket.on('disconnect', (reason: string) => {
      console.log('Socket disconnected:', reason);
      this.isConnected = false;
      this.emit('disconnected', reason);
    });

    this.socket.on('connect_error', (error: Error) => {
      console.error('Socket connection error:', error);
      this.emit('error', error);
    });

    this.socket.on('reconnect', (attemptNumber: number) => {
      console.log('Socket reconnected after', attemptNumber, 'attempts');
      this.emit('reconnected', attemptNumber);
    });

    this.socket.on('reconnect_error', (error: Error) => {
      console.error('Socket reconnection error:', error);
      this.reconnectAttempts++;
      this.emit('reconnect_error', error);
    });
  }

  private setupNetworkListener(): void {
    NetInfo.addEventListener(state => {
      if (state.isConnected && !this.isConnected) {
        // Network is back, attempt to reconnect
        this.connect();
      } else if (!state.isConnected && this.isConnected) {
        // Network is lost
        this.emit('network_lost');
      }
    });
  }

  connect(): void {
    if (this.socket && !this.isConnected) {
      this.socket.connect();
    }
  }

  disconnect(): void {
    if (this.socket) {
      this.socket.disconnect();
      this.isConnected = false;
    }
  }

  on(event: string, callback: Function): void {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event)!.push(callback);

    // Also register with socket.io
    if (this.socket) {
      this.socket.on(event, callback);
    }
  }

  off(event: string, callback?: Function): void {
    if (callback) {
      const listeners = this.listeners.get(event) || [];
      const index = listeners.indexOf(callback);
      if (index > -1) {
        listeners.splice(index, 1);
      }
      
      if (this.socket) {
        this.socket.off(event, callback);
      }
    } else {
      this.listeners.delete(event);
      if (this.socket) {
        this.socket.off(event);
      }
    }
  }

  emit(event: string, data?: any): void {
    if (this.socket && this.isConnected) {
      this.socket.emit(event, data);
    } else {
      // Queue the event for when connection is restored
      this.queueEvent(event, data);
    }
  }

  private queueEvent(event: string, data?: any): void {
    // Implement event queuing for offline scenarios
    const queuedEvents = this.getQueuedEvents();
    queuedEvents.push({ event, data, timestamp: Date.now() });
    this.saveQueuedEvents(queuedEvents);
  }

  private getQueuedEvents(): Array<{ event: string; data: any; timestamp: number }> {
    // Implement persistent storage for queued events
    return [];
  }

  private saveQueuedEvents(events: Array<{ event: string; data: any; timestamp: number }>): void {
    // Implement persistent storage
  }

  joinRoom(roomId: string): void {
    this.emit('join_room', { roomId });
  }

  leaveRoom(roomId: string): void {
    this.emit('leave_room', { roomId });
  }

  sendMessage(roomId: string, message: any): void {
    const messageData = {
      roomId,
      message,
      timestamp: Date.now(),
      messageId: this.generateMessageId(),
    };

    this.emit('send_message', messageData);
  }

  private generateMessageId(): string {
    return `${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
  }

  getConnectionStatus(): boolean {
    return this.isConnected;
  }
}

// Usage in React Native component
import React, { useEffect, useState } from 'react';
import { View, Text, TextInput, Button, FlatList } from 'react-native';

const ChatScreen: React.FC = () => {
  const [realTimeManager] = useState(() => new RealTimeManager('ws://localhost:3000'));
  const [messages, setMessages] = useState<any[]>([]);
  const [messageText, setMessageText] = useState('');
  const [isConnected, setIsConnected] = useState(false);

  useEffect(() => {
    realTimeManager.on('connected', () => {
      setIsConnected(true);
      realTimeManager.joinRoom('general');
    });

    realTimeManager.on('disconnected', () => {
      setIsConnected(false);
    });

    realTimeManager.on('new_message', (message: any) => {
      setMessages(prev => [...prev, message]);
    });

    realTimeManager.connect();

    return () => {
      realTimeManager.disconnect();
    };
  }, []);

  const sendMessage = () => {
    if (messageText.trim() && isConnected) {
      realTimeManager.sendMessage('general', {
        text: messageText,
        user: 'current_user',
      });
      setMessageText('');
    }
  };

  return (
    <View style={{ flex: 1, padding: 20 }}>
      <Text style={{ color: isConnected ? 'green' : 'red' }}>
        Status: {isConnected ? 'Connected' : 'Disconnected'}
      </Text>
      
      <FlatList
        data={messages}
        keyExtractor={(item, index) => index.toString()}
        renderItem={({ item }) => (
          <View style={{ padding: 10, borderBottomWidth: 1 }}>
            <Text>{item.user}: {item.text}</Text>
          </View>
        )}
      />
      
      <View style={{ flexDirection: 'row', marginTop: 20 }}>
        <TextInput
          style={{ flex: 1, borderWidth: 1, padding: 10 }}
          value={messageText}
          onChangeText={setMessageText}
          placeholder="Type a message..."
        />
        <Button title="Send" onPress={sendMessage} />
      </View>
    </View>
  );
};

Flutter Real-time Implementation

Using WebSocket and Stream Controllers

dart
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:flutter/foundation.dart';

class RealTimeManager {
  WebSocket? _socket;
  Timer? _heartbeatTimer;
  Timer? _reconnectTimer;
  
  bool _isConnected = false;
  int _reconnectAttempts = 0;
  final int _maxReconnectAttempts = 5;
  
  final String _url;
  final Map<String, String> _headers;
  
  // Stream controllers for different event types
  final StreamController<bool> _connectionController = StreamController<bool>.broadcast();
  final StreamController<Map<String, dynamic>> _messageController = StreamController<Map<String, dynamic>>.broadcast();
  final StreamController<String> _errorController = StreamController<String>.broadcast();
  
  // Streams
  Stream<bool> get connectionStream => _connectionController.stream;
  Stream<Map<String, dynamic>> get messageStream => _messageController.stream;
  Stream<String> get errorStream => _errorController.stream;
  
  RealTimeManager(this._url, {Map<String, String>? headers}) 
      : _headers = headers ?? {};
  
  Future<void> connect() async {
    try {
      _socket = await WebSocket.connect(_url, headers: _headers);
      _isConnected = true;
      _reconnectAttempts = 0;
      
      _connectionController.add(true);
      _startHeartbeat();
      _listenToMessages();
      
      print('WebSocket connected');
    } catch (e) {
      print('WebSocket connection failed: $e');
      _handleConnectionError(e.toString());
    }
  }
  
  void _listenToMessages() {
    _socket?.listen(
      (data) {
        try {
          final message = jsonDecode(data);
          _messageController.add(message);
        } catch (e) {
          print('Error parsing message: $e');
        }
      },
      onError: (error) {
        print('WebSocket error: $error');
        _handleConnectionError(error.toString());
      },
      onDone: () {
        print('WebSocket connection closed');
        _isConnected = false;
        _connectionController.add(false);
        _attemptReconnection();
      },
    );
  }
  
  void _startHeartbeat() {
    _heartbeatTimer?.cancel();
    _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
      if (_isConnected) {
        _sendHeartbeat();
      }
    });
  }
  
  void _sendHeartbeat() {
    sendMessage({
      'type': 'ping',
      'timestamp': DateTime.now().millisecondsSinceEpoch,
    });
  }
  
  void _handleConnectionError(String error) {
    _isConnected = false;
    _connectionController.add(false);
    _errorController.add(error);
    _attemptReconnection();
  }
  
  void _attemptReconnection() {
    if (_reconnectAttempts >= _maxReconnectAttempts) {
      _errorController.add('Max reconnection attempts reached');
      return;
    }
    
    _reconnectAttempts++;
    final delay = Duration(seconds: math.min(30, math.pow(2, _reconnectAttempts).toInt()));
    
    _reconnectTimer?.cancel();
    _reconnectTimer = Timer(delay, () {
      print('Attempting reconnection #$_reconnectAttempts');
      connect();
    });
  }
  
  void sendMessage(Map<String, dynamic> message) {
    if (_isConnected && _socket != null) {
      try {
        final jsonMessage = jsonEncode(message);
        _socket!.add(jsonMessage);
      } catch (e) {
        print('Error sending message: $e');
        _errorController.add('Failed to send message: $e');
      }
    } else {
      print('Cannot send message: WebSocket not connected');
      _queueMessage(message);
    }
  }
  
  final List<Map<String, dynamic>> _messageQueue = [];
  
  void _queueMessage(Map<String, dynamic> message) {
    _messageQueue.add(message);
    
    // Limit queue size
    if (_messageQueue.length > 100) {
      _messageQueue.removeAt(0);
    }
  }
  
  void _processQueuedMessages() {
    while (_messageQueue.isNotEmpty && _isConnected) {
      final message = _messageQueue.removeAt(0);
      try {
        sendMessage(message);
      } catch (error) {
        // Re-queue with increased retry count
        _queueMessage(message);
        break; // Stop processing if one fails
      }
    }
  }
  
  void joinRoom(String roomId) {
    sendMessage({
      'type': 'join_room',
      'roomId': roomId,
      'timestamp': DateTime.now().millisecondsSinceEpoch,
    });
  }
  
  void leaveRoom(String roomId) {
    sendMessage({
      'type': 'leave_room',
      'roomId': roomId,
      'timestamp': DateTime.now().millisecondsSinceEpoch,
    });
  }
  
  void sendChatMessage(String roomId, String text) {
    sendMessage({
      'type': 'chat_message',
      'roomId': roomId,
      'text': text,
      'messageId': _generateMessageId(),
      'timestamp': DateTime.now().millisecondsSinceEpoch,
    });
  }
  
  String _generateMessageId() {
    return '${DateTime.now().millisecondsSinceEpoch}_${math.Random().nextInt(999999)}';
  }
  
  void disconnect() {
    _heartbeatTimer?.cancel();
    _reconnectTimer?.cancel();
    _socket?.close();
    _isConnected = false;
    _connectionController.add(false);
  }
  
  void dispose() {
    disconnect();
    _connectionController.close();
    _messageController.close();
    _errorController.close();
  }
  
  bool get isConnected => _isConnected;
}

// Flutter Widget Usage
class ChatScreen extends StatefulWidget {
  @override
  _ChatScreenState createState() => _ChatScreenState();
}

class _ChatScreenState extends State<ChatScreen> {
  late RealTimeManager _realTimeManager;
  final List<ChatMessage> _messages = [];
  final TextEditingController _messageController = TextEditingController();
  StreamSubscription? _connectionSubscription;
  StreamSubscription? _messageSubscription;
  bool _isConnected = false;
  
  @override
  void initState() {
    super.initState();
    _initializeRealTime();
  }
  
  void _initializeRealTime() {
    _realTimeManager = RealTimeManager(
      'ws://localhost:3000',
      headers: {'Authorization': 'Bearer ${getAuthToken()}'},
    );
    
    _connectionSubscription = _realTimeManager.connectionStream.listen((isConnected) {
      setState(() {
        _isConnected = isConnected;
      });
      
      if (isConnected) {
        _realTimeManager.joinRoom('general');
      }
    });
    
    _messageSubscription = _realTimeManager.messageStream.listen((message) {
      _handleMessage(message);
    });
    
    _realTimeManager.connect();
  }
  
  void _handleMessage(Map<String, dynamic> message) {
    switch (message['type']) {
      case 'chat_message':
        setState(() {
          _messages.add(ChatMessage.fromJson(message));
        });
        break;
      case 'user_typing':
        _handleTypingIndicator(message);
        break;
      case 'presence_update':
        _handlePresenceUpdate(message);
        break;
    }
  }
  
  void _sendMessage() {
    final text = _messageController.text.trim();
    if (text.isNotEmpty && _isConnected) {
      _realTimeManager.sendChatMessage('general', text);
      _messageController.clear();
    }
  }
  
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('Chat'),
        backgroundColor: _isConnected ? Colors.green : Colors.red,
        actions: [
          Icon(_isConnected ? Icons.cloud_done : Icons.cloud_off),
        ],
      ),
      body: Column(
        children: [
          Expanded(
            child: ListView.builder(
              itemCount: _messages.length,
              itemBuilder: (context, index) {
                final message = _messages[index];
                return ListTile(
                  title: Text(message.text),
                  subtitle: Text(message.sender),
                  trailing: Text(
                    DateFormat('HH:mm').format(message.timestamp),
                  ),
                );
              },
            ),
          ),
          Padding(
            padding: const EdgeInsets.all(8.0),
            child: Row(
              children: [
                Expanded(
                  child: TextField(
                    controller: _messageController,
                    decoration: InputDecoration(
                      hintText: 'Type a message...',
                      border: OutlineInputBorder(),
                    ),
                    onSubmitted: (_) => _sendMessage(),
                  ),
                ),
                SizedBox(width: 8),
                IconButton(
                  icon: Icon(Icons.send),
                  onPressed: _isConnected ? _sendMessage : null,
                ),
              ],
            ),
          ),
        ],
      ),
    );
  }
  
  @override
  void dispose() {
    _connectionSubscription?.cancel();
    _messageSubscription?.cancel();
    _realTimeManager.dispose();
    _messageController.dispose();
    super.dispose();
  }
}

class ChatMessage {
  final String id;
  final String text;
  final String sender;
  final DateTime timestamp;
  
  ChatMessage({
    required this.id,
    required this.text,
    required this.sender,
    required this.timestamp,
  });
  
  factory ChatMessage.fromJson(Map<String, dynamic> json) {
    return ChatMessage(
      id: json['messageId'],
      text: json['text'],
      sender: json['sender'],
      timestamp: DateTime.fromMillisecondsSinceEpoch(json['timestamp']),
    );
  }
}

Advanced Real-time Features

Presence and User Status

typescript
class PresenceManager {
  private realTimeManager: RealTimeManager;
  private presenceUpdateInterval: NodeJS.Timeout | null = null;
  private userStatuses: Map<string, UserStatus> = new Map();
  private listeners: Set<PresenceListener> = new Set();

  constructor(realTimeManager: RealTimeManager) {
    this.realTimeManager = realTimeManager;
    this.setupPresenceHandlers();
  }

  private setupPresenceHandlers(): void {
    this.realTimeManager.on('presence_update', (data: any) => {
      this.handlePresenceUpdate(data);
    });

    this.realTimeManager.on('user_joined', (data: any) => {
      this.handleUserJoined(data);
    });

    this.realTimeManager.on('user_left', (data: any) => {
      this.handleUserLeft(data);
    });
  }

  startPresenceUpdates(): void {
    // Send periodic presence updates
    this.presenceUpdateInterval = setInterval(() => {
      this.sendPresenceUpdate();
    }, 30000); // Every 30 seconds

    // Send initial presence
    this.sendPresenceUpdate();
  }

  stopPresenceUpdates(): void {
    if (this.presenceUpdateInterval) {
      clearInterval(this.presenceUpdateInterval);
      this.presenceUpdateInterval = null;
    }
  }

  private sendPresenceUpdate(): void {
    this.realTimeManager.emit('presence_update', {
      status: 'online',
      lastSeen: Date.now(),
      activity: this.getCurrentActivity(),
    });
  }

  private getCurrentActivity(): string {
    // Determine current user activity
    return 'active'; // Could be 'typing', 'viewing', 'idle', etc.
  }

  setUserStatus(status: 'online' | 'away' | 'busy' | 'offline'): void {
    this.realTimeManager.emit('status_change', {
      status,
      timestamp: Date.now(),
    });
  }

  subscribeToPresence(userId: string): void {
    this.realTimeManager.emit('subscribe_presence', { userId });
  }

  unsubscribeFromPresence(userId: string): void {
    this.realTimeManager.emit('unsubscribe_presence', { userId });
  }

  private handlePresenceUpdate(data: any): void {
    const { userId, status, lastSeen, activity } = data;
    
    this.userStatuses.set(userId, {
      status,
      lastSeen: new Date(lastSeen),
      activity,
      updatedAt: new Date(),
    });

    this.notifyListeners('presence_updated', { userId, status, activity });
  }

  private handleUserJoined(data: any): void {
    this.notifyListeners('user_joined', data);
  }

  private handleUserLeft(data: any): void {
    this.notifyListeners('user_left', data);
  }

  getUserStatus(userId: string): UserStatus | null {
    return this.userStatuses.get(userId) || null;
  }

  addListener(listener: PresenceListener): void {
    this.listeners.add(listener);
  }

  removeListener(listener: PresenceListener): void {
    this.listeners.delete(listener);
  }

  private notifyListeners(event: string, data: any): void {
    this.listeners.forEach(listener => {
      if (listener[event]) {
        listener[event](data);
      }
    });
  }
}

interface UserStatus {
  status: 'online' | 'away' | 'busy' | 'offline';
  lastSeen: Date;
  activity: string;
  updatedAt: Date;
}

interface PresenceListener {
  presence_updated?(data: any): void;
  user_joined?(data: any): void;
  user_left?(data: any): void;
}

Typing Indicators

swift
class TypingIndicatorManager {
    private let realTimeManager: RealTimeManager
    private var typingTimer: Timer?
    private let typingTimeout: TimeInterval = 3.0
    private var isTyping = false
    
    init(realTimeManager: RealTimeManager) {
        self.realTimeManager = realTimeManager
        setupTypingHandlers()
    }
    
    private func setupTypingHandlers() {
        realTimeManager.on("user_typing") { [weak self] data in
            self?.handleUserTyping(data)
        }
        
        realTimeManager.on("user_stopped_typing") { [weak self] data in
            self?.handleUserStoppedTyping(data)
        }
    }
    
    func startTyping(in roomId: String) {
        guard !isTyping else { return }
        
        isTyping = true
        realTimeManager.emit("start_typing", ["roomId": roomId])
        
        // Reset typing timer
        typingTimer?.invalidate()
        typingTimer = Timer.scheduledTimer(withTimeInterval: typingTimeout, repeats: false) { [weak self] _ in
            self?.stopTyping(in: roomId)
        }
    }
    
    func stopTyping(in roomId: String) {
        guard isTyping else { return }
        
        isTyping = false
        typingTimer?.invalidate()
        typingTimer = nil
        
        realTimeManager.emit("stop_typing", ["roomId": roomId])
    }
    
    func updateTyping(in roomId: String) {
        if isTyping {
            // Reset the timer
            typingTimer?.invalidate()
            typingTimer = Timer.scheduledTimer(withTimeInterval: typingTimeout, repeats: false) { [weak self] _ in
                self?.stopTyping(in: roomId)
            }
        } else {
            startTyping(in: roomId)
        }
    }
    
    private func handleUserTyping(_ data: [String: Any]) {
        guard let userId = data["userId"] as? String,
              let roomId = data["roomId"] as? String else { return }
        
        NotificationCenter.default.post(
            name: .userStartedTyping,
            object: nil,
            userInfo: ["userId": userId, "roomId": roomId]
        )
    }
    
    private func handleUserStoppedTyping(_ data: [String: Any]) {
        guard let userId = data["userId"] as? String,
              let roomId = data["roomId"] as? String else { return }
        
        NotificationCenter.default.post(
            name: .userStoppedTyping,
            object: nil,
            userInfo: ["userId": userId, "roomId": roomId]
        )
    }
}

extension Notification.Name {
    static let userStartedTyping = Notification.Name("userStartedTyping")
    static let userStoppedTyping = Notification.Name("userStoppedTyping")
}

Performance Optimization

Connection Pooling and Load Balancing

javascript
// Server-side Node.js implementation
const cluster = require('cluster');
const sticky = require('sticky-session');
const socketio = require('socket.io');
const redis = require('redis');
const redisAdapter = require('socket.io-redis');

class RealTimeServer {
  constructor() {
    this.redisClient = redis.createClient(process.env.REDIS_URL);
    this.connections = new Map();
    this.rooms = new Map();
  }

  initialize() {
    const server = require('http').createServer();
    const io = socketio(server, {
      transports: ['websocket'],
      pingTimeout: 60000,
      pingInterval: 25000,
      upgradeTimeout: 30000,
      maxHttpBufferSize: 1e6, // 1MB
      cors: {
        origin: process.env.ALLOWED_ORIGINS?.split(',') || "*",
        methods: ["GET", "POST"]
      }
    });

    // Use Redis adapter for horizontal scaling
    io.adapter(redisAdapter({ 
      host: process.env.REDIS_HOST, 
      port: process.env.REDIS_PORT 
    }));

    io.use(this.authenticateSocket.bind(this));
    io.on('connection', this.handleConnection.bind(this));

    return server;
  }

  async authenticateSocket(socket, next) {
    try {
      const token = socket.handshake.auth.token;
      const user = await this.verifyToken(token);
      
      if (user) {
        socket.userId = user.id;
        socket.userData = user;
        next();
      } else {
        next(new Error('Authentication failed'));
      }
    } catch (error) {
      next(error);
    }
  }

  handleConnection(socket) {
    console.log(`User ${socket.userId} connected`);
    
    // Store connection
    this.connections.set(socket.userId, {
      socket,
      connectedAt: new Date(),
      lastActivity: new Date()
    });

    // Update user presence
    this.updateUserPresence(socket.userId, 'online');

    socket.on('join_room', (data) => this.handleJoinRoom(socket, data));
    socket.on('leave_room', (data) => this.handleLeaveRoom(socket, data));
    socket.on('send_message', (data) => this.handleMessage(socket, data));
    socket.on('start_typing', (data) => this.handleStartTyping(socket, data));
    socket.on('stop_typing', (data) => this.handleStopTyping(socket, data));
    socket.on('ping', () => this.handlePing(socket));

    socket.on('disconnect', () => this.handleDisconnection(socket));
  }

  handleJoinRoom(socket, { roomId }) {
    socket.join(roomId);
    
    // Track room membership
    if (!this.rooms.has(roomId)) {
      this.rooms.set(roomId, new Set());
    }
    this.rooms.get(roomId).add(socket.userId);

    // Notify other users in the room
    socket.to(roomId).emit('user_joined', {
      userId: socket.userId,
      userData: socket.userData,
      timestamp: Date.now()
    });

    // Send room info to the joining user
    socket.emit('room_joined', {
      roomId,
      members: Array.from(this.rooms.get(roomId)),
      timestamp: Date.now()
    });
  }

  handleMessage(socket, messageData) {
    const { roomId, message, messageId } = messageData;
    
    // Validate and sanitize message
    if (!this.validateMessage(message)) {
      socket.emit('error', { message: 'Invalid message format' });
      return;
    }

    // Store message in database
    this.storeMessage({
      id: messageId,
      roomId,
      senderId: socket.userId,
      content: message,
      timestamp: Date.now()
    });

    // Broadcast to room members
    socket.to(roomId).emit('new_message', {
      messageId,
      message,
      sender: socket.userData,
      roomId,
      timestamp: Date.now()
    });

    // Update last activity
    this.updateLastActivity(socket.userId);
  }

  async storeMessage(messageData) {
    // Store in database
    try {
      await this.database.messages.create(messageData);
      
      // Also cache recent messages in Redis
      await this.redisClient.lpush(
        `room:${messageData.roomId}:messages`,
        JSON.stringify(messageData)
      );
      
      // Keep only last 100 messages in cache
      await this.redisClient.ltrim(
        `room:${messageData.roomId}:messages`,
        0, 99
      );
    } catch (error) {
      console.error('Failed to store message:', error);
    }
  }

  updateUserPresence(userId, status) {
    this.redisClient.setex(`presence:${userId}`, 300, JSON.stringify({
      status,
      lastSeen: Date.now(),
      serverId: process.env.SERVER_ID
    }));

    // Broadcast presence update
    this.io.emit('presence_update', {
      userId,
      status,
      timestamp: Date.now()
    });
  }
}

// Start server with sticky sessions for load balancing
if (!sticky.listen(new RealTimeServer().initialize(), process.env.PORT || 3000)) {
  // Master process
  console.log('Master process started');
} else {
  // Worker process
  console.log(`Worker ${process.pid} started`);
}

Error Handling and Resilience

Comprehensive Error Management

typescript
class ResilientRealTimeManager {
  private connectionRetryStrategy: RetryStrategy;
  private messageRetryStrategy: RetryStrategy;
  private circuitBreaker: CircuitBreaker;
  private messageQueue: MessageQueue;
  
  constructor(config: RealTimeConfig) {
    this.connectionRetryStrategy = new ExponentialBackoffRetry({
      maxAttempts: 5,
      baseDelay: 1000,
      maxDelay: 30000,
      jitter: true
    });
    
    this.messageRetryStrategy = new LinearRetry({
      maxAttempts: 3,
      delay: 2000
    });
    
    this.circuitBreaker = new CircuitBreaker({
      failureThreshold: 5,
      timeoutThreshold: 10000,
      resetTimeout: 60000
    });
    
    this.messageQueue = new MessageQueue({
      maxSize: 1000,
      persistence: true
    });
  }

  async connect(): Promise<void> {
    return this.connectionRetryStrategy.execute(async () => {
      if (this.circuitBreaker.isOpen()) {
        throw new Error('Circuit breaker is open');
      }
      
      try {
        await this.establishConnection();
        this.circuitBreaker.recordSuccess();
      } catch (error) {
        this.circuitBreaker.recordFailure();
        throw error;
      }
    });
  }

  async sendMessage(message: any): Promise<void> {
    if (!this.isConnected()) {
      this.messageQueue.enqueue(message);
      return;
    }

    return this.messageRetryStrategy.execute(async () => {
      try {
        await this.transmitMessage(message);
      } catch (error) {
        // If sending fails, queue for retry
        this.messageQueue.enqueue(message);
        throw error;
      }
    });
  }

  private async processQueuedMessages(): Promise<void> {
    while (!this.messageQueue.isEmpty() && this.isConnected()) {
      const message = this.messageQueue.dequeue();
      try {
        await this.transmitMessage(message);
      } catch (error) {
        // Re-queue with increased retry count
        this.messageQueue.enqueue(message, { increaseRetryCount: true });
        break; // Stop processing if one fails
      }
    }
  }
}

class ExponentialBackoffRetry implements RetryStrategy {
  constructor(private config: ExponentialBackoffConfig) {}

  async execute<T>(operation: () => Promise<T>): Promise<T> {
    let attempt = 0;
    let delay = this.config.baseDelay;

    while (attempt < this.config.maxAttempts) {
      try {
        return await operation();
      } catch (error) {
        attempt++;
        
        if (attempt >= this.config.maxAttempts) {
          throw new Error(`Operation failed after ${attempt} attempts: ${error.message}`);
        }

        // Calculate next delay with jitter
        if (this.config.jitter) {
          delay = delay * (0.5 + Math.random() * 0.5);
        }
        delay = Math.min(delay * 2, this.config.maxDelay);

        await this.sleep(delay);
      }
    }

    throw new Error('Retry execution should not reach here');
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

class CircuitBreaker {
  private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
  private failureCount = 0;
  private lastFailureTime = 0;

  constructor(private config: CircuitBreakerConfig) {}

  isOpen(): boolean {
    if (this.state === 'OPEN') {
      // Check if reset timeout has passed
      if (Date.now() - this.lastFailureTime > this.config.resetTimeout) {
        this.state = 'HALF_OPEN';
        return false;
      }
      return true;
    }
    return false;
  }

  recordSuccess(): void {
    this.failureCount = 0;
    this.state = 'CLOSED';
  }

  recordFailure(): void {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    
    if (this.failureCount >= this.config.failureThreshold) {
      this.state = 'OPEN';
    }
  }
}

Best Practices

1. Connection Management

  • Implement automatic reconnection with exponential backoff
  • Use heartbeat/ping mechanisms to detect connection health
  • Handle network state changes gracefully
  • Implement connection pooling for multiple rooms/channels

2. Message Reliability

  • Queue messages when offline for later delivery
  • Implement message acknowledgments for critical messages
  • Use unique message IDs to prevent duplicates
  • Store important messages locally until confirmed delivered

3. Performance Optimization

typescript
class RealTimeOptimizer {
  // Batch small messages to reduce overhead
  static batchMessages(messages: any[], maxBatchSize: number = 10): any[][] {
    const batches = [];
    for (let i = 0; i < messages.length; i += maxBatchSize) {
      batches.push(messages.slice(i, i + maxBatchSize));
    }
    return batches;
  }

  // Throttle typing indicators
  static createTypingThrottle(callback: Function, delay: number = 1000) {
    let timeout: NodeJS.Timeout | null = null;
    let isTyping = false;

    return () => {
      if (!isTyping) {
        callback(true);
        isTyping = true;
      }

      if (timeout) {
        clearTimeout(timeout);
      }

      timeout = setTimeout(() => {
        callback(false);
        isTyping = false;
      }, delay);
    };
  }

  // Debounce presence updates
  static createPresenceDebounce(callback: Function, delay: number = 5000) {
    let timeout: NodeJS.Timeout | null = null;

    return (status: string) => {
      if (timeout) {
        clearTimeout(timeout);
      }

      timeout = setTimeout(() => {
        callback(status);
      }, delay);
    };
  }
}

4. Security Considerations

  • Authenticate all WebSocket connections
  • Validate and sanitize all incoming messages
  • Implement rate limiting to prevent abuse
  • Use secure WebSocket (WSS) in production
  • Validate user permissions for room access

5. Monitoring and Analytics

typescript
class RealTimeAnalytics {
  private metrics: RealTimeMetrics = {
    connectionsPerSecond: 0,
    messagesPerSecond: 0,
    averageLatency: 0,
    errorRate: 0,
    activeConnections: 0
  };

  trackConnection(userId: string, connectionTime: number): void {
    this.metrics.activeConnections++;
    this.recordMetric('connection_established', {
      userId,
      connectionTime,
      timestamp: Date.now()
    });
  }

  trackMessage(messageId: string, latency: number): void {
    this.metrics.messagesPerSecond++;
    this.updateAverageLatency(latency);
    
    this.recordMetric('message_sent', {
      messageId,
      latency,
      timestamp: Date.now()
    });
  }

  trackError(error: Error, context: any): void {
    this.metrics.errorRate++;
    this.recordMetric('error_occurred', {
      error: error.message,
      context,
      timestamp: Date.now()
    });
  }

  private updateAverageLatency(newLatency: number): void {
    // Use moving average
    this.metrics.averageLatency = 
      (this.metrics.averageLatency * 0.9) + (newLatency * 0.1);
  }

  generateReport(): RealTimeReport {
    return {
      metrics: this.metrics,
      timestamp: Date.now(),
      recommendations: this.generateRecommendations()
    };
  }
}

Conclusion

Real-time communication is essential for modern mobile applications, enabling features like chat, live updates, and collaborative experiences. Success requires:

  • Robust connection management with automatic reconnection and error handling
  • Platform-specific optimizations for iOS, Android, React Native, and Flutter
  • Scalable server architecture with load balancing and horizontal scaling
  • Performance optimization through message batching, throttling, and caching
  • Security measures including authentication, validation, and rate limiting
  • Comprehensive monitoring to track performance and user experience metrics

The key to successful real-time communication is building resilient systems that gracefully handle network issues while providing smooth, responsive user experiences across all platforms.

Created by Eren Demir.