js/messaging/stream.js
- // NPM IMPORTS
- import assert from 'assert'
- import Baconjs from 'baconjs'
-
- // COMMON IMPORTS
- import T from '../utils/types'
-
- let context = 'common/messaging/stream'
-
-
-
- /**
- * Stream class for BaconJS stream wrapping.
- *
- * @author Luc BORIES
- * @license Apache-2.0
- */
- export default class Stream
- {
- /**
- * Create a stream.
- *
- * @returns {nothing}
- */
- constructor(arg_stream=undefined)
- {
- this.is_stream = true
-
- this._source_stream = arg_stream ? arg_stream : new Baconjs.Bus()
- this._transformed_stream = this._source_stream
-
- this.counters = {}
- this.counters.msg_count = 0
- this.counters.msg_size = 0
- this.counters.errors_count = 0
- this.counters.subscribers_count = 0
-
- this._source_stream.onError(
- () => {
- this.counters.errors_count += 1
- }
- )
- }
-
-
-
- /**
- * Get input stream.
- *
- * @returns {Baconjs.Bus}
- */
- get_source_stream()
- {
- return this._source_stream
- }
-
-
-
- /**
- * Create a Stream instance with a DOM event source stream.
- *
- * @param {string} arg_dom_elem - DOM element.
- * @param {string} arg_event_name - DOM event name.
- *
- * @returns {Stream}
- */
- static from_dom_event(arg_dom_elem, arg_event_name)
- {
- return new Stream( Baconjs.fromEvent(arg_dom_elem, arg_event_name) )
- }
-
-
-
- /**
- * Create a Stream instance with an event emitter source stream.
- *
- * @param {object} arg_emitter - event emitter.
- * @param {string} arg_event_name - event name.
- *
- * @returns {Stream}
- */
- static from_emitter_event(arg_emitter, arg_event_name)
- {
- return new Stream( Baconjs.fromEvent(arg_emitter, arg_event_name) )
- }
-
-
-
- /**
- * Get output stream.
- *
- * @returns {Baconjs.Bus}
- */
- get_transformed_stream()
- {
- return this._transformed_stream
- }
-
-
-
- /**
- * Set output stream.
- *
- * @param {Baconjs.Bus} arg_stream - transformed stream.
- *
- * @returns {Stream} - this
- */
- set_transformed_stream(arg_stream)
- {
- this._transformed_stream = arg_stream
- return this
- }
-
-
-
- /**
- * Set output stream transformation.
- *
- * @param {function} arg_stream_transformation - function (source stream)=>{ return transformed stream }.
- *
- * @returns {Stream} - this
- */
- set_transformation(arg_stream_transformation)
- {
- assert( T.isFunction(arg_stream_transformation), context + ':transform:bad function')
- const src = this._source_stream
- const tr = this._transformed_stream
-
- try {
- this._transformed_stream = arg_stream_transformation(src)
- } catch(e) {
- this._transformed_stream = tr
- console.error(context + ':set_transformation', e)
- }
-
- return this
- }
-
-
-
- /**
- * Get counters snapshot.
- *
- * @returns {object} - counters.
- */
- get_counters_snapshot()
- {
- const counters = Object.assign({}, this.counters)
-
- return counters
- }
-
-
-
- /**
- * Get counters snapshot and reset values to 0.
- */
- get_and_reset_counters_snapshot()
- {
- const counters = Object.assign({}, this.counters)
-
- this.counters.msg_count = 0
- this.counters.msg_size = 0
- this.counters.errors_count = 0
- this.counters.subscribers_count = 0
-
- return counters
- }
-
-
-
- /**
- * Push a value into the stream.
- *
- * @param {any}
- *
- * @returns {Stream} - this.
- */
- push(arg_value)
- {
- this.counters.msg_count += 1
-
- // console.log(arg_value, context + ':push:value')
- this._source_stream.push(arg_value)
- return this
- }
-
-
-
- /**
- * Subscribe to stream values.
- *
- * @param {Function} arg_handler - value handler f(value) => nothing.
- *
- * @returns {Function} - unsubscribe function
- */
- subscribe(arg_handler)
- {
- assert( T.isFunction(arg_handler), context + ':subscribe:bad handler function')
-
- this.counters.subscribers_count += 1
-
- const unsubscribe = this._transformed_stream.onValue(arg_handler)
- return () => {
- this.counters.subscribers_count -= 1
- unsubscribe()
- }
-
- // return this._transformed_stream.onValue(
- // (value) => {
- // console.log(value, context + ':subscribe:value')
- // }
- // )
- }
-
-
-
- /**
- * Subscribe to stream errors.
- *
- * @param {Function} arg_handler - value handler f(value) => nothing.
- *
- * @returns {Function} - unsubscribe function
- */
- on_error(arg_handler)
- {
- assert( T.isFunction(arg_handler), context + ':subscribe:bad handler function')
-
- this.counters.subscribers_count += 1
-
- const unsubscribe = this._transformed_stream.onError(arg_handler)
- return () => {
- this.counters.subscribers_count -= 1
- unsubscribe()
- }
- }
-
-
-
- /**
- * Debounce immediate.
- *
- * @param {integer} arg_milliseconds - number of milliseconds.
- */
- debounce_immediate(arg_milliseconds)
- {
- this._transformed_stream = this._transformed_stream.debounceImmediate(arg_milliseconds)
- return this
- }
- }