import type { ISubscriber, ISubscription} from 'rsocket-types';
import { Flowable } from 'rsocket-flowable';

export default class ResubscribeOperator<T> implements ISubscriber<T>, ISubscription {
  source: Flowable<T>;
  actual: ISubscriber<T>;

  done: boolean;
  once: boolean;

  upstream: ISubscription;

  requested: number;

  constructor(source: Flowable<T>, actual: ISubscriber<T>) {
    this.source = source;
    this.actual = actual;
    this.requested = 0;
  }

  onSubscribe(subscription: ISubscription) {
    if (this.done) {
      subscription.cancel();
      return;
    }

    this.upstream = subscription;

    if (!this.once) {
      this.once = true;
      this.actual.onSubscribe(this);
      return;
    }

    subscription.request(this.requested);
  }

  onComplete() {
    if (this.done) {
      return;
    }

    this.done = true;
    this.actual.onComplete();
  }

  onError(error: Error) {
    if (this.done) {
      return;
    }

    this.upstream = null;
    setTimeout(() => this.source.subscribe(this));
  }

  onNext(value: T) {
    if (this.done) {
      return;
    }

    this.requested--;
    this.actual.onNext(value);
  }

  cancel() {
    if (this.done) {
      return;
    }

    this.done = true;

    if (this.upstream) {
      this.upstream = null;
      this.upstream.cancel();
    }
  }
  request(n: number) {
    this.requested += n;
    if (this.upstream) {
      this.upstream.request(n);
    }
  }
}