import Rx from 'rx';
import 'rx-dom';
import { extend } from './utils';
import multiplex from './multiplex';

const Subject = Rx.Subject;
const Observable = Rx.Observable;
const Observer = Rx.Observer;
const fromWebSocket = Rx.DOM.fromWebSocket;
const AnonymousSubject = Rx.AnonymousSubject;

function RxSocketSubject(config) {
    const connections = (this.connections = config.connections);
    const openObserver = (this.openObserver = config.openObserver);
    const errorObserver = (this.errorObserver = config.errorObserver);
    const closingObserver = (this.closingObserver = config.closingObserver);

    const observer = new Subject();
    let toSocket = new Subject();
    const msgBuffer = [];
    let isOpen = false;

    const socketOpen = function(e) {
        isOpen = true;

        if (openObserver) {
            openObserver.onNext(e);
        }

        while (msgBuffer.length > 0) {
            const msg = msgBuffer.shift();
            toSocket.onNext(msg);
        }
    };

    const socketClosed = function() {
        isOpen = false;
    };

    // subscribe to outward facing observer
    // and buffer messages if necessary
    observer.subscribe(
        function(msg) {
            if (isOpen) {
                toSocket.onNext(msg);
            } else {
                msgBuffer.push(msg);
            }
        },
        function(err) {
            if (toSocket) {
                toSocket.onError(err);
            }
        },
        function() {
            if (toSocket) {
                toSocket.onCompleted();
            }
        }
    );

    let innerObservable;
    let hasInnerObservable = false;
    const getInnerObservable = function() {
        if (!hasInnerObservable) {
            toSocket = new Subject();
            innerObservable = connections
                .map(function(conn) {
                    return typeof conn === 'string' ? { url: conn, protocol: null } : conn;
                })
                .flatMapLatest(function(conn) {
                    return Observable.create(function(o) {
                        // prettier-ignore
                        const closingProxyObserver = closingObserver ? Rx.Observer.create(function(e) { closingObserver.onNext(e); }) : undefined;

                        const socket = fromWebSocket(
                            conn.url,
                            conn.protocol,
                            Observer.create(function(e) {
                                socketOpen(e);
                            }),
                            closingProxyObserver
                        );

                        return new Rx.CompositeDisposable(
                            socket
                                .catch(function(err) {
                                    if (errorObserver) {
                                        errorObserver.onNext(err);
                                        // We're already catching errors in an error observer, so just continue
                                        // with an empty observable here.
                                        return Observable.empty();
                                    } else {
                                        return Observable.throw(err);
                                    }
                                })
                                .finally(function() {
                                    socketClosed();
                                })
                                .subscribe(o),

                            toSocket.subscribe(socket)
                        );
                    });
                })
                .finally(function() {
                    hasInnerObservable = false;
                })
                .publish()
                .refCount();

            hasInnerObservable = true;
        }

        return innerObservable;
    };

    const observable = Observable.create(function(o) {
        const disposable = getInnerObservable().subscribe(o);
        return disposable;
    });

    AnonymousSubject.call(this, observer, observable);
}

RxSocketSubject.prototype = extend(Object.create(AnonymousSubject.prototype), {
    constructor: RxSocketSubject,
    multiplex: function(responseFilter, options) {
        return multiplex(this, responseFilter, options);
    },
});

/**
   Creates a new Socket Subject. The socket subject is an observable of socket message events, as well
   as an observer of messages to send over the socket with `onNext()`, an a means to close the socket
   with `onCompleted()` or `onError()`.

   @method create
   @param connections {Rx.Observable} an observable of connection information, either endpoint URL strings,
   or objects with `{ url: someUrl, protocol: someProtocol }`.
   @param openObserver {Rx.Observer} [optional] an observer that will trigger
   when the underlying socket opens. Will never error or complete.
   @param errorObserver {Rx.Observer} [optional] an observer that emits errors occurring on the
   socket. Will never error or complete.
   @param closingObserver {Rx.Observer} [optional] an obsesrver that emits when the socket is about to close.
 */
RxSocketSubject.create = function(connections, openObserver = null, errorObserver = null, closingObserver = null) {
    let config;
    if (connections instanceof Observable) {
        config = {
            connections: connections,
            openObserver: openObserver,
            errorObserver: errorObserver,
            closingObserver: closingObserver,
        };
    } else {
        config = connections;
    }
    return new RxSocketSubject(config);
};

RxSocketSubject.multiplex = multiplex;

export default RxSocketSubject;
