RxJs избегают внешнего состояния, но все еще имеют доступ к предыдущим значениям

Я использую RxJs для прослушивания amqp queu (не очень важно).

У меня есть функция createConnection которая возвращает Observable которая испускает новый объект соединения . Как только у меня есть соединение, я хочу отправлять сообщения через него каждые 1000 мс, а после 10 сообщений я хочу закрыть соединение.

Я пытаюсь избежать внешнего состояния, но если я не храню соединение во внешней переменной, как его закрыть? См. Раздел «Начало работы с соединением», затем « flatMap и «Push», поэтому после нескольких цепей у меня больше нет объекта соединения.

Это не мой stream, но представьте себе что-то вроде этого:

 createConnection() .flatMap(connection => connection.createChannel()) .flatMap(channel => channel.send(message)) .do(console.log) .subscribe(connection => connection.close()) <--- obviously connection isn't here 

Теперь я понимаю, что глупо это делать, но теперь, как мне получить доступ к соединению? Я мог бы, конечно, начать с var connection = createConnection()

и позже как-то присоединяйся к этому. Но как мне это сделать? Я даже не знаю, как правильно задать этот вопрос. В нижней части, что у меня есть наблюдаемое, которое испускает соединение, после того, как соединение открыто, я хочу, чтобы наблюдаемое, которое излучает сообщения каждые 1000 мс (с помощью take(10) ), затем закрывает соединение

    Прямой ответ на ваш вопрос: «вы можете переносить его через каждый шаг». Например, вы можете заменить эту строку

     .flatMap(connection => connection.createChannel()) 

    с этим:

     .flatMap(connection => ({ connection: connection, channel: connection.createChannel() })) 

    и сохранить доступ к соединению полностью вниз.

    Но есть еще один способ сделать то, что вы хотите сделать. Предположим, что ваши функции createConnection и createChannel выглядят примерно так:

     function createConnection() { return Rx.Observable.create(observer => { console.log('creating connection'); const connection = { createChannel: () => createChannel(), close: () => console.log('disposing connection') }; observer.onNext(connection); return Rx.Disposable.create(() => connection.close()); }); } function createChannel() { return Rx.Observable.create(observer => { const channel = { send: x => console.log('sending message: ' + x) }; observer.onNext(channel); // assuming no cleanup here, don't need to return disposable }); } 

    createConnectioncreateChannel , но мы сосредоточимся на первом) возвращает холодный наблюдаемый; каждый абонент получит свой собственный stream связи, содержащий одно соединение, и когда эта подписка истечет, логика удаления будет вызываться автоматически.

    Это позволяет сделать что-то вроде этого:

     const subscription = createConnection() .flatMap(connection => connection.createChannel()) .flatMap(channel => Rx.Observable.interval(1000).map(i => ({ channel: channel, data: i }))) .take(10) .subscribe(x => x.channel.send(x.data)) ; 

    Фактически вам не нужно устанавливать подписку на очистку; после выполнения take(10) вся цепочка завершится, и будет активирована очистка. Единственная причина, по которой вам нужно будет прямо называть подписку на подписку, – это то, что вы хотели оторвать вещи до того, как 10 1000 мс были в очереди.

    Обратите внимание, что это решение также содержит экземпляр прямого ответа на ваш вопрос: мы вывозим канал вниз по линии, чтобы мы могли использовать его в onNext lambda, переданном на вызов подписки (обычно это такой код).

    Вот работа: https://jsbin.com/korihe/3/edit?js,console,output

    Этот код дал мне ошибку, потому что flatmap ждет наблюдаемого <(T)> и ({connection: connection, channel: connection.createChannel ()} ), это объект.

    .flatMap(connection => ({ connection: connection, channel: connection.createChannel() }))

    вместо этого вы можете использовать оператор combLatest

    .flatMap(connection => Observable.combineLatest( Observable.of(connection), connection.createChannel(), (connection, channel) => { ... code .... });