import { DataSource } from '@angular/cdk/collections';
import { Observable, BehaviorSubject, Subscription, merge } from 'rxjs';
import { map } from 'rxjs/operators';

import { MatPaginator, MatSort, StreamType } from './models';
import { LocalDataStream, RemoteDataStream } from './';

export class TableDataSource<T> extends DataSource<T> {
    set paginator(matPaginator: MatPaginator) {
        if (matPaginator) {
            this.matPaginator = matPaginator;

            if (!this.paginatorSubscription) {
                this.paginatorSubscription = this.paginator.page.subscribe(
                    (event) => this.paginator$.next(event)
                );
            }
        }
    }

    get paginator(): MatPaginator {
        return this.matPaginator || null;
    }

    set sorter(matSorter: MatSort) {
        if (matSorter) {
            this.matSorter = matSorter;

            if (!this.sorterSubscription) {
                this.sorterSubscription = this.sorter.sortChange.subscribe(
                    (event) => this.sorter$.next(event)
                );
            }
        }
    }

    get sorter(): MatSort {
        return this.matSorter || null;
    }

    set sortBy(sortBy: Function) {
        this.sort = sortBy;
    }

    get sortBy(): Function {
        return this.sort || null;
    }

    set stream(dataStream: LocalDataStream<T> | RemoteDataStream<T>) {
        this.dataStream = dataStream;
    }

    get stream(): LocalDataStream<T> | RemoteDataStream<T> {
        return this.dataStream;
    }

    get length(): number {
        return this.dataStream.data.length;
    }

    private dataStream: LocalDataStream<T> | RemoteDataStream<T>;

    private matPaginator: MatPaginator;
    private matSorter: MatSort;
    private sort: Function;

    private paginator$: BehaviorSubject<any> = new BehaviorSubject(null);
    private paginatorSubscription: Subscription;
    private sorter$: BehaviorSubject<any> = new BehaviorSubject(null);
    private sorterSubscription: Subscription;

    private changeSubscription: Subscription;

    constructor() {
        super();
    }

    /** Connect function called by the table to retrieve one stream containing the data to render. */
    public connect(): Observable<T[]> {
        this.changeSubscription = merge(
            this.paginator$,
            this.sorter$
        )
            .subscribe((data) => {
                if (data) {
                    this.dataStream.load();
                }
            });

        return merge(
            this.dataStream.dataChange$,
        )
            .pipe(
                map((stream) => {
                    if (stream) {
                        if (stream.type === StreamType.LOCAL) {
                            const data = this.dataStream.data;
                            const startIndex = (this.paginator) ? (this.paginator.pageIndex * this.paginator.pageSize) : 0;
                            const pageSize = (this.paginator) ? this.paginator.pageSize : stream.length;

                            return this.sortData(data).splice(startIndex, pageSize);
                        }

                        if (stream.type === StreamType.REMOTE) {
                            if (this.paginator) {
                                this.paginator.length = stream.length;

                                if (stream.pageIndex !== null) {
                                    this.paginator.pageIndex = stream.pageIndex;
                                }

                                if (stream.pageSize !== null) {
                                    this.paginator.pageSize = stream.pageSize;
                                }
                            }

                            return this.dataStream.data;
                        }
                    }

                    return [];
                })
            );

        //   .pipe(
        //     startWith({}),
        //     switchMap(() => {
        //       this.isLoadingResults = true;
        //       return this.exampleDatabase!.getRepoIssues(
        //         this.sort.active, this.sort.direction, this.paginator.pageIndex);
        //     }),
        //     map(data => {
        //       // Flip flag to show that loading has finished.
        //       this.isLoadingResults = false;
        //       this.isRateLimitReached = false;
        //       this.resultsLength = data.total_count;

        //       return data.items;
        //     }),
        //     catchError(() => {
        //       this.isLoadingResults = false;
        //       // Catch if the GitHub API has reached its rate limit. Return empty data.
        //       this.isRateLimitReached = true;
        //       return observableOf([]);
        //     })
    }

    public disconnect() {
        if (this.paginatorSubscription) {
            this.paginatorSubscription.unsubscribe();
            this.paginatorSubscription = null;
        }

        if (this.sorterSubscription) {
            this.sorterSubscription.unsubscribe();
            this.sorterSubscription = null;
        }

        if (this.changeSubscription) {
            this.changeSubscription.unsubscribe();
            this.changeSubscription = null;
        }

        this.paginator$.complete();
        this.sorter$.complete();

        this.dataStream.close();
    }

    private sortData(data: T[]): T[] {
        if (!this.sorter || !this.sortBy || !this.sorter.active || this.sorter.direction === '') {
            return data;
        }

        return data.sort((a: T, b: T) => this.sortBy(this.sorter, a, b));
    }
}
