import { isEmpty, isNil, keys } from "lodash";
import {
  bufferTime,
  catchError,
  filter,
  map,
  mergeMap,
  Observable,
  of,
  scan,
  shareReplay,
  Subject,
  switchMap,
  take,
  tap,
} from "rxjs";
import { v4 as uuidv4 } from "uuid";

interface RequestIdContainer {
  requestId: string;
}

export type IdentifiableRequest<TRequest> = RequestIdContainer & TRequest;
export type IdentifiableResult<TResult> = RequestIdContainer & { result: TResult | null; error?: any };

export interface IRequestBundlePerformer<TRequest, TResult> {
  get bufferInMilliseconds(): number;
  performRequests: (requests: IdentifiableRequest<TRequest>[]) => Observable<IdentifiableResult<TResult>[]>;
}

export abstract class RequestBufferServiceBase<TRequest, TResult> {
  private readonly fulfilledRequests = new Set<string>();
  private readonly requestSubject = new Subject<IdentifiableRequest<TRequest>>();

  constructor(private readonly requestBundleDefinition: IRequestBundlePerformer<TRequest, TResult>) {}

  // An observable that maintains a dictionary of results, keyed by an identifier for each request.
  private readonly resultDictionary$: Observable<Record<string, TResult>> = this.requestSubject.pipe(
    bufferTime(this.requestBundleDefinition.bufferInMilliseconds),
    filter((requests) => !isEmpty(requests)),
    mergeMap((requests) => {
      const requestIds = requests.map(({ requestId }) => requestId);
      return this.requestBundleDefinition.performRequests(requests).pipe(
        catchError((error) =>
          of(requestIds.map<IdentifiableResult<TResult>>((requestId) => ({ requestId, result: null, error })))
        ),
        map((results) => results ?? []),
        map((results) => results.filter((result) => !isNil(result))),
        map((results) =>
          requestIds.map((requestId) => {
            const foundResult = results.find((result) => requestId === result.requestId);
            return foundResult ?? { requestId, result: null };
          })
        )
      );
    }),
    map((results: IdentifiableResult<TResult>[]) =>
      results.reduce((dictionary, { requestId, result }) => ({ ...dictionary, [requestId]: result }), {})
    ),
    scan((oldResults, newResults) => {
      const pendingResults = keys(oldResults).reduce((dictionary, key) => {
        return this.fulfilledRequests.has(key) ? dictionary : { ...dictionary, [key]: oldResults[key] };
      }, {});

      this.fulfilledRequests.clear();
      return { ...pendingResults, ...newResults };
    }, {}),
    shareReplay(1)
  );

  performRequest(request: TRequest): Observable<TResult | null> {
    return of(uuidv4()).pipe(
      tap((requestId) => this.requestSubject.next({ requestId, ...request })),
      switchMap((requestId) => {
        return this.resultDictionary$.pipe(
          filter((dictionary) => keys(dictionary).includes(requestId)),
          map((dictionary) => dictionary[requestId]),
          tap(() => this.fulfilledRequests.add(requestId))
        );
      }),
      take(1)
    );
  }
}
