Refactor websocket into separate services, clean up socket status communication (#4433)

* Refactor websocket into separate services, clean up socket status communication

* cleanup

* add EOF lines

* fix keepalive logged in check

* undo change

* fix keepalive connection check

* cleanup

* add typings

* secure connection

Co-authored-by: Jeremy Letto <jeremy.letto@datasite.com>
This commit is contained in:
Jeremy Letto 2021-10-17 00:07:30 -05:00 committed by GitHub
parent 19333c53f6
commit e9ba195d7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
52 changed files with 815 additions and 757 deletions

View file

@ -0,0 +1,43 @@
import { Subject } from "rxjs";
import { WebSocketService } from "./WebSocketService";
export class KeepAliveService {
private socket: WebSocketService;
private keepalivecb: NodeJS.Timeout;
private lastPingPending: boolean;
public disconnected$ = new Subject<void>();
constructor(socket: WebSocketService) {
this.socket = socket;
}
public startPingLoop(interval: number, ping: Function): void {
this.keepalivecb = setInterval(() => {
// check if the previous ping got no reply
if (this.lastPingPending) {
this.disconnected$.next();
}
// stop the ping loop if we"re disconnected
if (!this.socket.checkReadyState(WebSocket.OPEN)) {
this.endPingLoop();
return;
}
this.lastPingPending = true;
ping(() => this.lastPingPending = false);
}, interval);
}
public endPingLoop() {
clearInterval(this.keepalivecb);
this.keepalivecb = null;
}
public resetPingFlag() {
this.lastPingPending = false;
}
}

View file

@ -0,0 +1,132 @@
import protobuf from "protobufjs";
import ProtoFiles from "../ProtoFiles";
import { WebClient } from "../WebClient";
import { RoomEvents, SessionEvents } from '../events';
export interface ProtobufEvents {
[event: string]: Function;
}
export class ProtobufService {
static PB_FILE_DIR = `${process.env.PUBLIC_URL}/pb`;
public controller;
private cmdId = 0;
private pendingCommands: { [cmdId: string]: Function } = {};
private webClient: WebClient;
constructor(webClient: WebClient) {
this.webClient = webClient;
this.loadProtobufFiles();
}
public resetCommands() {
this.cmdId = 0;
this.pendingCommands = {};
}
public sendRoomCommand(roomId: number, roomCmd: number, callback?: Function) {
const cmd = this.controller.CommandContainer.create({
"roomId" : roomId,
"roomCommand" : [ roomCmd ]
});
this.sendCommand(cmd, raw => callback && callback(raw));
}
public sendSessionCommand(sesCmd: number, callback?: Function) {
const cmd = this.controller.CommandContainer.create({
"sessionCommand" : [ sesCmd ]
});
this.sendCommand(cmd, (raw) => callback && callback(raw));
}
public sendModeratorCommand(modCmd: number, callback?: Function) {
const cmd = this.controller.CommandContainer.create({
"moderatorCommand" : [ modCmd ]
});
this.sendCommand(cmd, (raw) => callback && callback(raw));
}
public sendCommand(cmd: number, callback: Function) {
this.cmdId++;
cmd["cmdId"] = this.cmdId;
this.pendingCommands[this.cmdId] = callback;
if (this.webClient.socket.checkReadyState(WebSocket.OPEN)) {
this.webClient.socket.send(this.controller.CommandContainer.encode(cmd).finish());
}
}
public handleMessageEvent({ data }: MessageEvent): void {
try {
const uint8msg = new Uint8Array(data);
const msg = this.controller.ServerMessage.decode(uint8msg);
if (msg) {
switch (msg.messageType) {
case this.controller.ServerMessage.MessageType.RESPONSE:
this.processServerResponse(msg.response);
break;
case this.controller.ServerMessage.MessageType.ROOM_EVENT:
this.processRoomEvent(msg.roomEvent, msg);
break;
case this.controller.ServerMessage.MessageType.SESSION_EVENT:
this.processSessionEvent(msg.sessionEvent, msg);
break;
case this.controller.ServerMessage.MessageType.GAME_EVENT_CONTAINER:
// @TODO
break;
}
}
} catch (err) {
console.error("Processing failed:", err);
}
}
private processServerResponse(response: any) {
const { cmdId } = response;
if (this.pendingCommands[cmdId]) {
this.pendingCommands[cmdId](response);
delete this.pendingCommands[cmdId];
}
}
private processRoomEvent(response: any, raw: any) {
this.processEvent(response, RoomEvents, raw);
}
private processSessionEvent(response: any, raw: any) {
this.processEvent(response, SessionEvents, raw);
}
private processEvent(response: any, events: ProtobufEvents, raw: any) {
for (const event in events) {
const payload = response[event];
if (payload) {
events[event](payload, raw);
return;
}
}
}
private loadProtobufFiles() {
const files = ProtoFiles.map(file => `${ProtobufService.PB_FILE_DIR}/${file}`);
this.controller = new protobuf.Root();
this.controller.load(files, { keepCase: false }, (err, root) => {
if (err) {
throw err;
}
});
}
}

View file

@ -0,0 +1,101 @@
import { Subject } from 'rxjs';
import { ServerStatus, StatusEnum } from "types";
import { KeepAliveService } from "./KeepAliveService";
import { WebClient } from '../WebClient';
export interface WebSocketOptions {
host: string;
port: string;
user: string;
pass: string;
autojoinrooms: boolean;
keepalive: number;
}
export class WebSocketService {
private socket: WebSocket;
private webClient: WebClient;
public keepAliveService: KeepAliveService;
public message$: Subject<MessageEvent> = new Subject();
public statusChange$: Subject<ServerStatus> = new Subject();
private status: StatusEnum = StatusEnum.DISCONNECTED;
private keepalive: number;
constructor(webClient: WebClient) {
this.webClient = webClient;
this.keepAliveService = new KeepAliveService(this);
this.keepAliveService.disconnected$.subscribe(() => {
this.disconnect();
this.updateStatus(StatusEnum.DISCONNECTED, "Connection timeout");
});
}
public connect(options: WebSocketOptions, protocol: string = 'wss'): void {
const { host, port, keepalive } = options;
this.keepalive = keepalive;
this.socket = this.createWebSocket(`${protocol}://${host}:${port}`);
}
public disconnect(): void {
if (this.socket) {
this.socket.close();
}
}
public checkReadyState(state: number): boolean {
return this.socket.readyState === state;
}
public send(message): void {
this.socket.send(message);
}
public updateStatus(status: StatusEnum, description: string): void {
this.status = status;
this.statusChange$.next({status, description});
}
private createWebSocket(url: string): WebSocket {
const socket = new WebSocket(url);
socket.binaryType = "arraybuffer"; // We are talking binary
socket.onopen = () => {
this.updateStatus(StatusEnum.CONNECTED, "Connected");
this.keepAliveService.startPingLoop(this.keepalive, (pingReceived: Function) => {
const command = this.webClient.protobuf.controller.SessionCommand.create({
".Command_Ping.ext" : this.webClient.protobuf.controller.Command_Ping.create()
});
this.webClient.protobuf.sendSessionCommand(command, () => {
pingReceived();
});
});
};
socket.onclose = () => {
// dont overwrite failure messages
if (this.status !== StatusEnum.DISCONNECTED) {
this.updateStatus(StatusEnum.DISCONNECTED, "Connection Closed");
}
this.keepAliveService.endPingLoop();
};
socket.onerror = () => {
this.updateStatus(StatusEnum.DISCONNECTED, "Connection Failed");
};
socket.onmessage = (event: MessageEvent) => {
this.message$.next(event);
}
return socket;
}
}