operators/merge.js

  1. import { Observable } from '../Observable';
  2. import { onSubscriptionsComplete } from '../utilities/onSubscriptionsComplete';
  3. /**
  4. * Will merge any number of observables into one observable stream
  5. *
  6. * @memberof operators
  7. *
  8. * @param {Observable[]} sources$
  9. * @returns {Observable}
  10. */
  11. export const merge = function (sources$) {
  12. return new Observable(function ({ next, error, complete }) {
  13. let subscriptions = [];
  14. const onComplete = () => onSubscriptionsComplete(subscriptions, complete);
  15. const subscribeTo = (obs$) => {
  16. return obs$.subscribe({
  17. next,
  18. error,
  19. complete: onComplete,
  20. });
  21. };
  22. subscriptions = sources$.map((s$, index) => subscribeTo(s$, index));
  23. return () => subscriptions.forEach((s) => s.unsubscribe());
  24. });
  25. };
  26. Observable.merge = merge;
  27. Observable.prototype.merge = function (otherSources$) {
  28. return merge([this, ...otherSources$]);
  29. };