import { Observable, Subject } from 'rxjs';
import { filter, takeUntil, repeat, map } from 'rxjs/operators';
import { connectWs } from '../../../utils/ws';
import { onSubscriptionMsg, onPongMsg } from '.';
var ws;
var wsInstance$ = new Subject();
var reconnect$ = new Subject();

var makeDataStream = function makeDataStream(wsUrl, options) {
  ws = connectWs(wsUrl, {
    initSubs: options && options.initSubs || {},
    keepAlive: {
      msg: 'ping'
    },
    onPongCb: onPongMsg,
    onSubscriptionCb: onSubscriptionMsg,
    onReconnectCb: function onReconnectCb(err, data) {
      ws = data;
      reconnect$.next();
    }
  });
  var dataFeed$ = Observable.create(function (observer) {
    ws.addEventListener('message', function (event) {
      observer.next(event);
    });
    wsInstance$.next(ws);
    return function () {
      if (ws.readyState === 1) {
        ws.close();
        console.warn('Bitmex WS closed');
      }

      console.warn('Bitmex dataFeed$ closed');
    };
  }).pipe(filter(function (event) {
    return event.data !== 'pong';
  }), map(function (event) {
    var data = JSON.parse(event.data);

    if (data.table) {
      return event;
    }

    return null;
  }), filter(function (event) {
    return event;
  }), takeUntil(reconnect$), repeat());
  return [wsInstance$, dataFeed$];
};

export default makeDataStream;