js/messaging/stream_bus_engine.js
// NPM IMPORTS
import assert from 'assert'
// COMMON IMPORTS
import T from '../utils/types'
import BusEngine from './bus_engine'
import Stream from './stream'
const context = 'common/messaging/stream_bus_engine'
/**
* Stream based bus engine class for message bus client or server.
*
* @author Luc BORIES
* @license Apache-2.0
*
* @example
* API:
* ->constructor(arg_name,arg_settings,arg_log_context,arg_logger_manager).
*
* ->channel_list():array - List engine channels.
* ->channel_add(arg_channel):nothing - Add a channel.
* ->channel_send(arg_channel, arg_payload):nothing - Send a message into a channel.
* ->channel_on(arg_channel, arg_handler):nothing - Subscribe on channel inputs.
* ->channel_transform(arg_in_channel, arg_out_channel, arg_xform_handler):nothing - Transform payload of input channel to output channel.
*
*/
export default class StreamBusEngine extends BusEngine
{
/**
* Create a bus.
*
* @param {string} arg_name - instance name.
* @param {object} arg_settings - settings.
* @param {string} arg_log_context - trace context.
* @param {LoggerManager} arg_logger_manager - logger manager object (optional).
*
* @returns {nothing}
*/
constructor(arg_name, arg_settings, arg_log_context=context, arg_logger_manager=undefined)
{
super(arg_name, arg_settings, arg_log_context, arg_logger_manager)
this.is_stream_bus_engine = true
this._channels = {}
// this._channels['default'] = new Stream()
// this.channel_add('default')
}
/**
* List engine channels.
*
* @returns {array}
*/
channel_list()
{
return Object.keys(this._channels)
}
/**
* Add a channel.
*
* @param {string} arg_channel - channel name.
*
* @returns {nothing}
*/
channel_add(arg_channel)
{
assert( T.isString(arg_channel), this.get_context() + ':channel_add:bad channel name')
this._channels[arg_channel] = new Stream()
}
/**
* Send a message into a channel.
*
* @param {string} arg_channel - channel name string.
* @param {object} arg_payload - payload data object.
*
* @returns {nothing}
*/
channel_send(arg_channel, arg_payload)
{
assert( T.isString(arg_channel), this.get_context() + ':channel_send:bad channel name')
assert( arg_payload, this.get_context() + ':channel_send:bad payload data')
assert( arg_channel in this._channels, this.get_context() + ':channel_send:channel [' + arg_channel + '] stream not found')
this._channels[arg_channel].push(arg_payload)
}
/**
* Subscribe on channel inputs.
*
* @param {string} arg_channel - channel name string.
* @param {function} arg_handler - f(payload):nothing
* @param {function} arg_predicate - p(payload):boolean
*
* @returns {nothing}
*/
channel_on(arg_channel, arg_handler, arg_predicate=undefined)
{
assert( T.isString(arg_channel), this.get_context() + ':channel_on:bad channel name on engine [' + this.get_name() + ']')
assert( T.isFunction(arg_handler), this.get_context() + ':channel_on:bad handler function on engine [' + this.get_name() + '] for channel [' + arg_channel + ']')
assert( arg_channel in this._channels, this.get_context() + ':channel_on:channel [' + arg_channel + '] stream not found on engine [' + this.get_name() + ']')
this._channels[arg_channel].subscribe(
(value) => {
// console.log(context + ':subscribe:bus[' + this.get_name() + '] channel [' + arg_channel + '] received value', value)
// FILTER BY PREDICATE
if ( T.isFunction(arg_predicate) )
{
if ( arg_predicate(value) )
{
arg_handler(value)
return
}
}
// NO VALID PREDICATE
arg_handler(value)
}
)
}
}