import { ApolloLink, FetchResult, NextLink, Observable, Observer, Operation } from '@apollo/client/core';
import Pusher from 'pusher-js';

// Turn `subscribe` arguments into an observer-like thing, see getObserver
// https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/client.ts#L329-L343
function getObserver<T>(
    observerOrNext: Observer<T> | ((v: T) => void),
    onError: ((error: any) => void) | undefined,
    onComplete: (() => void) | undefined,
) {
    if (typeof observerOrNext === 'function') {
        // Duck-type an observer
        return {
            next: (v: T) => observerOrNext(v),
            error: (e: any) => onError && onError(e),
            complete: () => onComplete && onComplete(),
        };
    }
    // Make an object that calls to the given object, with safety checks
    return {
        next: (v: T) => observerOrNext.next && observerOrNext.next(v),
        error: (e: any) => observerOrNext.error && observerOrNext.error(e),
        complete: () => observerOrNext.complete && observerOrNext.complete(),
    };

}

export class PusherLink extends ApolloLink {
    private pusher: Pusher;

    constructor(options: { pusher: Pusher; }) {
        super();

        this.pusher = options.pusher;
    }

    public override request(operation: Operation, forward: NextLink) {
        const subscribeObservable = new Observable<FetchResult>(() => {
            //
        });

        // Capture the super method
        const prevSubscribe = subscribeObservable.subscribe.bind(
            subscribeObservable,
        );

        // Override subscribe to return an `unsubscribe` object, see
        // https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/client.ts#L182-L212
        subscribeObservable.subscribe = (
            observerOrNext: Observer<FetchResult> | ((value: FetchResult) => void),
            onError?: (error: any) => void,
            onComplete?: () => void,
        ) => {
            if (typeof observerOrNext === 'function') {
                prevSubscribe(observerOrNext, onError, onComplete);
            }

            const observer = getObserver(observerOrNext, onError, onComplete);

            let subscriptionChannel: string;

            forward(operation).subscribe({
                next: (data) => {
                    // If the operation has the subscription channel, it's a subscription
                    subscriptionChannel = data?.extensions?.lighthouse_subscriptions.channel ?? null;

                    // No subscription found in the response, pipe data through
                    if (! subscriptionChannel) {
                        observer.next(data);
                        observer.complete();

                        return;
                    }

                    this.subscribeToChannel(subscriptionChannel, observer);
                },
            });

            // Return an object that will unsubscribe_if the query was a subscription
            return {
                closed: false,
                unsubscribe: () => {
                    if (subscriptionChannel) {
                        this.unsubscribeFromChannel(subscriptionChannel);
                    }
                },
            };
        };

        return subscribeObservable;
    }

    public subscribeToChannel(
        subscriptionChannel: string,
        observer: { next: any; error?: (e: any) => void | undefined; complete: any; },
    ) {
        this.pusher
            .subscribe(subscriptionChannel)
            .bind('lighthouse-subscription', (payload: { more?: any; result?: any; }) => {
                if (! payload.more) {
                    this.unsubscribeFromChannel(subscriptionChannel);

                    observer.complete();
                }

                const { result } = payload;

                if (result) {
                    observer.next(result);
                }
            });
    }

    public unsubscribeFromChannel(subscriptionChannel: string) {
        this.pusher.unsubscribe(subscriptionChannel);
    }
}
