import {
    RSocketClient,
  } from 'rsocket-core';
  import type {ReactiveSocket, Payload, ConnectionStatus } from 'rsocket-types';
  import { Flowable, Single } from 'rsocket-flowable';
  
  export default class ReconnectableRSocket<D, M> implements ReactiveSocket<D, M> {
  
    socket: ReactiveSocket<D, M>;
    clientFactory: () => RSocketClient<D, M>;
    errorCallback: (error: string)=>void;
  
    constructor(clientFactory: () => RSocketClient<D, M>, errorCallback) {
      this.clientFactory = clientFactory;
      this.errorCallback = errorCallback;
      //this.connect();
    }
    async reconnect():Promise<void>{
      await new Promise(resolve => setTimeout(resolve, 10000)); // 10 sec
      this.errorCallback("disconnected");
      this.connect();
    }
  //delay = ms => new Promise<void>(res => setTimeout(res, ms));
   async connect():Promise<void>{
     try{
       //if (this.socket!=null) this.socket.close();
       //if (this.clientFactory!=null) this.clientFactory().close();
     this.socket = await this.clientFactory().connect();
     }
     catch(error){
       this.reconnect();
       console.log(error);
       return;
     }
     if (this.socket==null || this.socket.connectionStatus == null){
        this.reconnect();
        return;
       // this.connect();
     }
     //console.log('Reconnectable Rsocket connected');
    this.socket.connectionStatus().subscribe(event => {
            if (event.kind !== 'CONNECTED') {
              console.log(event.kind);
              this.socket = null;
              this.reconnect();
              return;
            }
          });
    }
 /* async connect():Promise<void> {
      this.clientFactory().connect().then(
        socket => {
          this.socket = socket;
          socket.connectionStatus().subscribe(event => {
            if (event.kind !== 'CONNECTED') {
              console.log(event.kind);
              this.socket = null;
              this.connect();
            }
          });
        },
        error => {
          this.connect();
          console.log(error);
      }
      );
    }*/
  
  
  
    fireAndForget(payload: Payload<D, M>): void {
      if (!this.socket) {
        throw new Error('Not Connected yet. Retry later');
      }
  
      this.socket.fireAndForget(payload);
    }
  
    requestResponse(payload: Payload<D, M>): Single<Payload<D, M>> {
      if (!this.socket) {
        return Single.error(new Error('Not Connected yet. Retry later'));
      }
  
      return this.socket.requestResponse(payload);
    }
  
    requestStream(payload: Payload<D, M>): Flowable<Payload<D, M>> {
      if (!this.socket) {
        return Flowable.error(new Error('Not Connected yet. Retry later'));
      }
  
      return this.socket.requestStream(payload);
    }
  
    requestChannel(payloads: Flowable<Payload<D, M>>): Flowable<Payload<D, M>> {
      if (!this.socket) {
        return Flowable.error(new Error('Not Connected yet. Retry later'));
      }
  
      return this.socket.requestChannel(payloads);
    }
  
    metadataPush(payload: Payload<D, M>): Single<void> {
      if (!this.socket) {
        return Single.error(new Error('Not Connected yet. Retry later'));
      }
  
      return this.socket.metadataPush(payload);
    }
    close(): void {
      this.socket.close();
    }
  
    connectionStatus(): Flowable<ConnectionStatus> {
      return this.socket.connectionStatus();
    }
  
    availability(): number {
      return this.socket.availability();
    }
  
  }


