import { Observable, Subject } from 'rxjs';
import { filter, takeUntil, repeat } from 'rxjs/operators';
import { connectWs } from '../../../utils/ws';
import { onSubscriptionMsg, onPongMsg } from '.';
var ws;
var reconnect$ = new Subject();

var makeDataStream = function makeDataStream(wsUrl) {
  var options = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
  var initSubs = options.initSubs,
      wsInstance$ = options.wsInstance$;
  ws = connectWs(wsUrl, {
    initSubs: initSubs || {},
    keepAlive: {
      msg: {
        event: 'ping'
      }
    },
    onPongCb: onPongMsg,
    onSubscriptionCb: onSubscriptionMsg,
    onReconnectCb: function onReconnectCb(err, data) {
      ws = data;
      reconnect$.next();
    },
    onOpenCb: function onOpenCb() {
      return wsInstance$.next(ws);
    }
  });
  var dataFeed$ = Observable.create(function (observer) {
    ws.addEventListener('message', function (event) {
      observer.next(event);
    });
    wsInstance$.next(ws);
    return function () {
      if (ws.readyState === 1) {
        ws.close();
        console.warn('Bitfinex WS closed');
      }

      console.warn('Bitfinex dataFeed$ closed');
    };
  }).pipe(filter(function (msg) {
    return msg;
  }), takeUntil(reconnect$), repeat());
  return dataFeed$;
};

export default makeDataStream;