js/messaging/stream.js
// NPM IMPORTS
import assert from 'assert'
import Baconjs from 'baconjs'
// COMMON IMPORTS
import T from '../utils/types'
let context = 'common/messaging/stream'
/**
* Stream class for BaconJS stream wrapping.
*
* @author Luc BORIES
* @license Apache-2.0
*/
export default class Stream
{
/**
* Create a stream.
*
* @returns {nothing}
*/
constructor(arg_stream=undefined)
{
this.is_stream = true
this._source_stream = arg_stream ? arg_stream : new Baconjs.Bus()
this._transformed_stream = this._source_stream
this.counters = {}
this.counters.msg_count = 0
this.counters.msg_size = 0
this.counters.errors_count = 0
this.counters.subscribers_count = 0
this._source_stream.onError(
() => {
this.counters.errors_count += 1
}
)
}
/**
* Get input stream.
*
* @returns {Baconjs.Bus}
*/
get_source_stream()
{
return this._source_stream
}
/**
* Create a Stream instance with a DOM event source stream.
*
* @param {string} arg_dom_elem - DOM element.
* @param {string} arg_event_name - DOM event name.
*
* @returns {Stream}
*/
static from_dom_event(arg_dom_elem, arg_event_name)
{
return new Stream( Baconjs.fromEvent(arg_dom_elem, arg_event_name) )
}
/**
* Create a Stream instance with an event emitter source stream.
*
* @param {object} arg_emitter - event emitter.
* @param {string} arg_event_name - event name.
*
* @returns {Stream}
*/
static from_emitter_event(arg_emitter, arg_event_name)
{
return new Stream( Baconjs.fromEvent(arg_emitter, arg_event_name) )
}
/**
* Get output stream.
*
* @returns {Baconjs.Bus}
*/
get_transformed_stream()
{
return this._transformed_stream
}
/**
* Set output stream.
*
* @param {Baconjs.Bus} arg_stream - transformed stream.
*
* @returns {Stream} - this
*/
set_transformed_stream(arg_stream)
{
this._transformed_stream = arg_stream
return this
}
/**
* Set output stream transformation.
*
* @param {function} arg_stream_transformation - function (source stream)=>{ return transformed stream }.
*
* @returns {Stream} - this
*/
set_transformation(arg_stream_transformation)
{
assert( T.isFunction(arg_stream_transformation), context + ':transform:bad function')
const src = this._source_stream
const tr = this._transformed_stream
try {
this._transformed_stream = arg_stream_transformation(src)
} catch(e) {
this._transformed_stream = tr
console.error(context + ':set_transformation', e)
}
return this
}
/**
* Get counters snapshot.
*
* @returns {object} - counters.
*/
get_counters_snapshot()
{
const counters = Object.assign({}, this.counters)
return counters
}
/**
* Get counters snapshot and reset values to 0.
*/
get_and_reset_counters_snapshot()
{
const counters = Object.assign({}, this.counters)
this.counters.msg_count = 0
this.counters.msg_size = 0
this.counters.errors_count = 0
this.counters.subscribers_count = 0
return counters
}
/**
* Push a value into the stream.
*
* @param {any}
*
* @returns {Stream} - this.
*/
push(arg_value)
{
this.counters.msg_count += 1
// console.log(arg_value, context + ':push:value')
this._source_stream.push(arg_value)
return this
}
/**
* Subscribe to stream values.
*
* @param {Function} arg_handler - value handler f(value) => nothing.
*
* @returns {Function} - unsubscribe function
*/
subscribe(arg_handler)
{
assert( T.isFunction(arg_handler), context + ':subscribe:bad handler function')
this.counters.subscribers_count += 1
const unsubscribe = this._transformed_stream.onValue(arg_handler)
return () => {
this.counters.subscribers_count -= 1
unsubscribe()
}
// return this._transformed_stream.onValue(
// (value) => {
// console.log(value, context + ':subscribe:value')
// }
// )
}
/**
* Subscribe to stream errors.
*
* @param {Function} arg_handler - value handler f(value) => nothing.
*
* @returns {Function} - unsubscribe function
*/
on_error(arg_handler)
{
assert( T.isFunction(arg_handler), context + ':subscribe:bad handler function')
this.counters.subscribers_count += 1
const unsubscribe = this._transformed_stream.onError(arg_handler)
return () => {
this.counters.subscribers_count -= 1
unsubscribe()
}
}
/**
* Debounce immediate.
*
* @param {integer} arg_milliseconds - number of milliseconds.
*/
debounce_immediate(arg_milliseconds)
{
this._transformed_stream = this._transformed_stream.debounceImmediate(arg_milliseconds)
return this
}
}