import { Observable, Subscriber, type Observer } from 'rxjs';
import { share } from 'rxjs/operators';

export function fromMessagePort<Down = unknown, Up = unknown>(
  port: Pick<Worker, 'onmessage' | 'postMessage'>,
): [Observable<Down>, Observer<Up>] {
  return [createDownstreamObservable<Down>(port), createUpstreamObserver<Up>(port)];
}

/**
 * @todo
 * - implement subscription through addEventListener
 * - implement teardown logic through removeEventListener
 * - handle start at subscription
 * - handle close event to close stream
 * - are port message errors terminal ?
 *   - then handle error on stream
 *   - else unite custom {[Symbol(porterror)]: message} to Down interface
 */
export function createDownstreamObservable<Down = any>(
  port: Pick<Worker, 'onmessage'>,
): Observable<Down> {
  const downstream$ = new Observable<Down>(function subscribe(observer) {
    port.onmessage = function onmessage(ev) {
      observer.next(ev.data);
    };
  });
  return downstream$.pipe(share());
}

/**
 * @todo
 * - what should be done on error or complete
 * - port may be closed (need resilience)
 * - keep ignoring
 */
export function createUpstreamObserver<Up = any>(
  port: Pick<Worker, 'postMessage'>,
): Subscriber<Up> {
  function next(message: Up) {
    port.postMessage(message);
  }
  return new Subscriber({ next });
}
