import { Injectable, NgZone, OnDestroy } from '@angular/core';
import {
  BehaviorSubject,
  combineLatest,
  Observable,
  Subject,
  map,
  of,
  merge,
} from 'rxjs';
import { environment } from '@envs/environment';
import { EventSourcePolyfill } from 'event-source-polyfill';
import { AuthSelectors } from '@app/store/auth/auth.selectors';
import { Select } from '@ngxs/store';
import { take, takeUntil, tap, throttleTime } from 'rxjs/operators';
import { ApiService } from './api.service';
import { SubscriptionAPIResponse } from '@app/modules/mercure/interfaces/subscription-api-response';
import { User } from '@app/modules/users/interfaces/user';

const EventSource = EventSourcePolyfill;

@Injectable({
  providedIn: 'root',
})
export class SseService implements OnDestroy {
  private host = environment.mercure_url;
  private updates = new BehaviorSubject({});
  public updates$ = this.updates.asObservable();

  @Select(AuthSelectors.mercureTopic)
  topic$: Observable<string>;

  @Select(AuthSelectors.mercureSubscriptions)
  mercureSubscriptions$: Observable<string>;

  @Select(AuthSelectors.token)
  token$: Observable<string>;

  private src: EventSource;
  private srcUserConnections: EventSource;

  private lastEventID: string | null = null;

  private reconnect$ = new Subject();
  private destroy$ = new Subject();

  constructor(private zone: NgZone, private apiService: ApiService) {
    this.subscribeToRealTimeUpdates();
  }

  subscribeToRealTimeUpdates(): void {
    combineLatest([
      this.topic$.pipe(
        tap(() => {
          console.log('NEW TOPIC');
        })
      ),
      this.token$.pipe(
        tap(() => {
          console.log('NEW TOKEN');
        })
      ),
      merge(
        of(true),
        this.reconnect$.pipe(
          throttleTime(5000),
          tap(() => {
            console.log('NEW RECONNECT');
          })
        )
      ),
    ])
      .pipe(takeUntil(this.destroy$))
      .subscribe((result: any) => {
        console.log({ result });
        if (result[1] && result[0]) {
          // Build URL
          const hubURL = this.host + '/.well-known/mercure';
          const subscribeURL = new URL(hubURL);
          subscribeURL.searchParams.append('topic', result[0]);

          if (this.lastEventID) {
            console.log(
              'Using last event ID for the new connection: ',
              this.lastEventID
            );
            subscribeURL.searchParams.append('lastEventID', this.lastEventID);
          }

          if (this.src) {
            console.log('CLOSE PREVIOUS CONNECTION');
            this.src.close();
          }

          console.log('CONNECTING...');
          this.src = new EventSource(subscribeURL, {
            headers: {
              Authorization: 'Bearer ' + result[1],
            },
          });
          console.log({ src: this.src });
          this.src.onopen = (event: MessageEvent) => {
            console.log({ onopen: event });
          };
          this.src.onmessage = (event: MessageEvent) => {
            this.lastEventID = event.lastEventId;
            console.log('An SSE result:');
            console.log(event);
            this.updateRefreshTokenIfNeeded();
            return this.zone.run(() => {
              this.updates.next(JSON.parse(event.data));
            });
          };
          this.src.onerror = (error: any) => {
            this.updateRefreshTokenIfNeeded();
            this.reconnect$.next(true);
            console.log('An SSE error occured:');
            console.log(error);
          };
        }
      });
  }

  updateRefreshTokenIfNeeded(): void {
    this.apiService.get('/just-update-refresh-token').pipe(take(1)).subscribe(); // hack
  }

  getOnlineUsers(): Observable<SubscriptionAPIResponse<User>> {
    return this.apiService.get<SubscriptionAPIResponse<User>>('/subscriptions');
  }

  monitorUserConnections(lastEventID): Observable<EventSource> {
    return this.token$.pipe(
      map((token: string) => {
        // Build URL
        const hubURL = this.host + '/.well-known/mercure';
        const subscribeURL = new URL(hubURL);
        subscribeURL.searchParams.append(
          'topic',
          '/.well-known/mercure/subscriptions{/topic}{/subscriber}'
        );
        subscribeURL.searchParams.append('lastEventID', lastEventID);

        if (this.srcUserConnections) {
          this.srcUserConnections.close();
        }

        // Establish connection
        this.srcUserConnections = new EventSource(subscribeURL, {
          headers: {
            Authorization: 'Bearer ' + token,
          },
        });

        return this.srcUserConnections;
      })
    );
  }

  ngOnDestroy(): void {
    this.destroy$.next(true);
    this.reconnect$.complete();
    this.destroy$.complete();
  }
}
