import {Injectable} from "@angular/core";
import {Subject, Observable, tap} from "rxjs";
import {map, filter, finalize} from "rxjs/operators";
import {SocketEvent} from "./socket-event.models";

export class WebsocketService {
  private wsSubject = new Subject<MessageEvent>();
  private readonly url: string;
  private ws: WebSocket;

  constructor(url: string) {
    this.url = url;
    this.connect();
  }

  private connect() {
    this.ws = new WebSocket(this.url);
    this.ws.onmessage = this.wsSubject.next.bind(this.wsSubject);
    this.ws.onerror = this.wsSubject.error.bind(this.wsSubject);
    this.ws.onclose = this.wsSubject.complete.bind(this.wsSubject);
  }

  event(event: string): Observable<SocketEvent<any>> {
    return this.wsSubject.pipe(
      // tap((e: MessageEvent) => console.log(e)),
      map((e: MessageEvent): {event: string, msg: SocketEvent<any>} => JSON.parse(e.data)),
      filter((e: {event: string, msg: SocketEvent<any>}) => e.event === event),
      map((e: {event: string, msg: SocketEvent<any>}): SocketEvent<any> => e.msg),
      tap((e: SocketEvent<any>) => console.log(e)),
      finalize(() => {
        // close connection if there are no more observers
        if (this.wsSubject.observers.length === 1) {
          this.ws.close();
        }
      })
    );
  }

  emit(event: string, msg: SocketEvent<any>) {
    if (this.isConnected()) {
      this.ws.send(JSON.stringify({event, msg}));
    }
  }

  isConnected(): boolean {
    return this.ws.readyState === WebSocket.OPEN;
  }
}

@Injectable({providedIn: "root"})
export class WebsocketServiceFactory {
  create(url: string): WebsocketService {
    return new WebsocketService(url);
  }
}
