import {Inject, Injectable} from '@angular/core';
import {BehaviorSubject, interval, Observable, Observer, Subject, SubscriptionLike} from 'rxjs';
import {WebSocketSubject, WebSocketSubjectConfig} from 'rxjs/webSocket';
import {distinctUntilChanged, filter, map, share, takeUntil, takeWhile} from 'rxjs/operators';
import {NotifierService} from 'angular-notifier';
import {AuthService} from './auth.service';

/** Интерфейс конфигурации веб сокета */
export interface WebSocketConfig {
    url: string;
    /** Как часто пытаться подключаться */
    reconnectInterval?: number;
    /** Количество попыток подключения */
    reconnectAttempts?: number;
}

/** Интерфейс отправляемого сообщения */
export interface IWsMessage<T> {
    command: string;
    message: T;
}

/** Интерфейс получаемых сообщений */
export interface IWsReturnedMessage<T> {
    error: boolean;
    payload: T;
    status: number;
    type: string;
    msg: string;
}

export interface IWebsocketService {
    status: Observable<boolean>;

    on<T>(command: string): Observable<T>;

    send(command: string, message: any): void;
}


@Injectable()
export class WebsocketService implements IWebsocketService {

    public onLine$ = new BehaviorSubject(true);
    public connected$ = new BehaviorSubject(false);
    public websocket$: WebSocketSubject<IWsMessage<any> | IWsReturnedMessage<any>>;
    // статус соединения
    public status: Observable<boolean>;
    // объект конфигурации WebSocketSubject
    private config: WebSocketSubjectConfig<IWsMessage<any> | IWsReturnedMessage<any>>;
    private websocketSub: SubscriptionLike;
    private statusSub: SubscriptionLike;
    private wsSub: SubscriptionLike;
    // Observable для реконнекта по interval
    private reconnection$: Observable<number> = null;
    private reconect$ = new Subject<void>();
    private disconnect$ = new Subject();
    // сообщает, когда происходит коннект и реконнект
    private connection$: Observer<boolean>;
    // вспомогательный Observable для работы с подписками на сообщения
    wsMessages$: Subject<IWsMessage<any> | IWsReturnedMessage<any>>;
    // пауза между попытками реконнекта в милисекундах
    private reconnectInterval: number;
    // количество попыток реконнекта
    private reconnectAttempts: number;
    // синхронный вспомогатель для статуса соединения
    private isConnected: boolean;

    public pingPong$ = new BehaviorSubject(false);

    /** config инжектируется */
    constructor(
        @Inject('config') private wsConfig: WebSocketConfig,
        private authService: AuthService,
        private notifierService: NotifierService) {
        this.wsMessages$ = new Subject<IWsMessage<any> | IWsReturnedMessage<any>>();

        // смотрим конфиг, если пусто, задаем умолчания для реконнекта
        this.reconnectInterval = wsConfig.reconnectInterval || 1000;
        this.reconnectAttempts = wsConfig.reconnectAttempts || 1000;

        // получаем данные апи кея
        // при сворачивании коннекта меняем статус connection$ и глушим websocket$
        const date = new Date();
        const timezone = date.getTimezoneOffset();
        this.config = {
            url: wsConfig.url + '/?api_key=' + authService.auth.apiKey + '&tz=' + timezone,
            closeObserver: {
                next: (event: CloseEvent) => {
                    this.websocket$ = null;
                    this.connection$.next(false);
                }
            },
            // при коннекте меняем статус connection$
            openObserver: {
                next: (event: Event) => {
                    this.connection$.next(true);
                }
            }
        };

        // говорим, что что-то пошло не так
        this.websocketSub = this.wsMessages$
            .pipe(takeUntil(this.disconnect$))
            .subscribe(
                null, (error: ErrorEvent) => console.error('WebSocket error!', error)
            );

        setInterval(() => {
            if (this.connected$.value) {
                this.send('ping');
                this.pingPong$.next(true);
            }
        }, 15000);
    }

    public manualCheckConnectionWs(): void {
        if (this.websocket$ !== null && !this.isConnected && !this.reconnection$) {
            console.log('manualCheckConnectionWs fail');
            this.websocket$?.complete();
            this.reconnect();
        }
    }

    public manualReconnect() {
        this.websocket$?.complete();
        this.reconnect();
    }

    public disconnect() {
        this.websocket$?.complete();
        this.wsMessages$.complete();
        this.connection$.complete();
    }

    /**
     * Подписывается на сообщения определенного типа
     *
     * @param type
     */
    public on<T>(type, msg?): Observable<T> {
        return this.wsMessages$.pipe(
            filter((message: IWsReturnedMessage<T>) => message.type === type),
            filter((message: IWsReturnedMessage<T>) => msg ? message.msg === msg : true),
            map((message: IWsReturnedMessage<T>) => message.payload)
        );
    }

    /**
     * Подписывается на сообщения определенного типа
     *
     * @param type
     */
    public onNoPayload<T>(type): Observable<any> {
        return this.wsMessages$.pipe(
            filter((message: IWsReturnedMessage<T>) => message.type === type),
            map((message: IWsReturnedMessage<T>) => message)
        );
    }

    public send(command: string, message: any = {}): void {
        if (command && this.isConnected && this.websocket$) {
            this.websocket$.next({command, message});
        }
    }

    public connect(): void {
        // connection status
        this.status = new Observable<boolean>((observer) => {
            this.connection$ = observer;
        }).pipe(
            takeUntil(this.disconnect$),
            share(),
            distinctUntilChanged()
        );

        // запускаем реконнект при отсутствии соединения
        this.statusSub = this.status
            .pipe(takeUntil(this.disconnect$))
            .subscribe((isConnected) => {

                this.isConnected = isConnected;
                this.connected$.next(isConnected);

                if (!this.reconnection$ && !this.isConnected) {
                    this.reconnect();
                }
            });

        this.websocket$ = new WebSocketSubject(this.config); // создаем
        // если есть сообщения, шлем их в дальше,
        // если нет, ожидаем
        // реконнектимся, если получили ошибку
        this.websocket$
            .pipe(takeUntil(this.disconnect$))
            .subscribe(
                (message) => this.wsMessages$.next(message),
                (error: Event) => {
                    if (!this.websocket$ && this.authService.currentUser$.value) {
                        // run reconnect if errors
                        this.reconnect();
                    }
                });
    }

    private reconnect(): void {
        // Создаем interval со значением из reconnectInterval
        this.reconnection$ = interval(this.reconnectInterval)
            .pipe(takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$));

        // Пытаемся подключиться пока не подключимся, либо не упремся в ограничение попыток подключения
        this.reconnection$
            .pipe(takeUntil(this.disconnect$))
            .subscribe(
                () => this.connect(),
                null,
                () => {
                    // Subject complete if reconnect attemts ending
                    this.reconnection$ = null;

                    if (!this.websocket$) {
                        this.wsMessages$.complete();
                        this.connection$.complete();
                    }
                });
    }

}
