import { Injectable } from '@angular/core';
import {HttpClient, HttpHeaders, HttpParams, HttpRequest} from '@angular/common/http';
import {BehaviorSubject, EMPTY, Observable, of, Subject, Subscription, timer} from 'rxjs';
import {catchError, delay, delayWhen, map, pairwise, retryWhen, share, startWith, switchAll, tap} from 'rxjs/operators';
import { environment } from '@environment';
import {webSocket, WebSocketSubject} from "rxjs/webSocket";
import {OrdersService} from "@/_services/api/orders.service";

@Injectable({
  providedIn: 'root'
})
export class WebsocketService {

  private socket$: WebSocketSubject<any>;
  private messagesSubject$ = new Subject();
  public messages$ = this.messagesSubject$.pipe(switchAll(), catchError(e => { throw e }));

  private pingTimer: Subscription;
  private pongTimer: Subscription;

  public onMessage;

  public connect(cfg: { reconnect: boolean } = { reconnect: false }): void {
    console.log('called');

    let accessToken = localStorage.getItem('access_token');

    if (!accessToken) {
      console.log("cannot connect, no auth!");
      return;
    }

    let params = new HttpParams().set('token', accessToken);

    if (this.socket$ && !this.socket$.closed) {
      this.socket$.complete();
    }

    this.socket$ = this.getNewWebSocket(environment.websocketUrl + '?' + params.toString());

    const messages = this.socket$.pipe(
      //cfg.reconnect ? this.reconnect.bind(this) : o => o,
      tap({
        next: _ => console.log("hey were here"),
        error: error => console.log(error),
      }),
      catchError(_ => EMPTY),
    );

    this.messagesSubject$.next(messages);

    this.pingTimer = timer(20000, 20000).subscribe(_ => {
      console.log("should ping");
      if (this.socket$ && !this.socket$.closed) {
        console.log("we pinging");
        this.sendMessage({ type: "ping" });

        this.pongTimer = timer(5000).subscribe(_ => {
          this.close();
          this.pongTimer = null;
        });
      }
    }, error => {
      console.log(error);
    });
  }

  sendMessage(msg: any) {
    if (this.socket$ && !this.socket$.closed) {
      this.socket$.next(msg);
    }
  }

  close() {
    if (this.socket$ && !this.socket$.closed) {
      this.socket$.complete();
    }
  }

  constructor(private orders: OrdersService) {
    console.log('created');

    this.messages$.subscribe(message => {
      this.handleMessage(message);
    });

    this.connect({ reconnect: true });
  }

  private reconnect(observable: Observable<any>): Observable<any> {
    return observable.pipe(
      retryWhen(errors => errors.pipe(
        tap(val => {
          // call an api function to eventually refresh token
          console.log('[Data Service] Try to reconnect in 5 seconds...', val);
        }),
        delay(5000)
      ))
    );
  }

  private getNewWebSocket(endpoint) {
    return webSocket({
      url: endpoint,
      closeObserver: {
        next: () => {
          console.log('[DataService]: connection closed');
          this.socket$ = undefined;

          if (this.pingTimer) {
            this.pingTimer.unsubscribe();
            this.pingTimer = null;
          }

          // refresh token if needed
          timer(5000).subscribe(_ => {
            this.connect({ reconnect: true });
          });
        }
      },
    });
  }

  private handleMessage(message) {
    if (this.onMessage)
      this.onMessage(message);

    if (message.type === "pong") {
      if (this.pongTimer) {
        this.pongTimer.unsubscribe();
        this.pongTimer = null;
      }

      return;
    }

    if (message.type === "order") {
      this.orders.updateOrder(message.data);
    }
    else if (message.type === "remove_order") {
      this.orders.removeOrder(message.data);
    }
    else if (message.type === "redisplay_order") {
      this.orders.redisplayOrder(message.data);
    }
    else if (message.type === "refresh") {
      window.location.reload();
    }
  }
}
