Reference Source

js/messaging/bus_engine.js

// NPM IMPORTS
import assert from 'assert'

// COMMON IMPORTS
import T        from '../utils/types'
import Instance from '../base/instance'


let context = 'common/messaging/bus_engine'



/**
 * Interface for bus engine.
 * @abstract
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 * 
 * @example
* API:
*   ->constructor(arg_name, arg_settings, arg_log_context).
* 
*   ->channel_list()
*   ->channel_add(arg_channel)
*   ->channel_send(arg_channel, arg_payload)
*   ->channel_on(arg_channel, arg_handler)
*   ->channel_transform(arg_in_channel, arg_out_channel, arg_xform_handler).
* 
 */
export default class BusEngine extends Instance
{
	/**
	 * Create a bus.
	 * 
	 * @param {string} arg_name - instance name.
	 * @param {object} arg_settings - settings.
	 * @param {string} arg_log_context - trace context.
	 * @param {LoggerManager} arg_logger_manager - logger manager object (optional).
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_name, arg_settings, arg_log_context=context, arg_logger_manager=undefined)
	{
		super('buses', 'BusEngine', arg_name, arg_settings, arg_log_context, arg_logger_manager)
		
		/**
		 * Class type flag.
		 * @type {boolean}
		 */
		this.is_bus_engine = true

		this._engine_type      = this.get_setting_js('type', 'Client')
		this._engine_protocole = this.get_setting_js('protocole', 'https')
		this._engine_host      = this.get_setting_js('host', 'localhost')
		this._engine_port      = this.get_setting_js('port', '9999')
		this._engine_url       = this._engine_protocole + '://' + this._engine_host + ':' + this._engine_port
	}
	

	
	/**
	 * List engine channels.
	 * 
	 * @returns {array}
	 */
	channel_list()
	{
		throw new Error('channel_list:Not yet implemented')
	}
	

	
	/**
	 * Add a channel.
	 * 
	 * @param {string} arg_channel - channel name.
	 * 
	 * @returns {nothing}
	 */
	channel_add(arg_channel)
	{
		assert( T.isString(arg_channel), this.get_context() + ':channel_add:bad channel name')
		throw new Error('channel_add:Not yet implemented')
	}



	/**
	 * Send a message into a channel.
	 * 
	 * @param {string} arg_channel - channel name string.
	 * @param {object} arg_payload - payload data object.
	 * 
	 * @returns {nothing}
	 */
	channel_send(arg_channel, arg_payload)
	{
		assert( T.isString(arg_channel), this.get_context() + ':channel_send:bad channel name')
		assert( arg_payload, this.get_context() + ':channel_send:bad payload data')
		throw new Error('channel_send:Not yet implemented')
	}
	
	
	
	/**
	 * Subscribe on channel inputs.
	 * 
	 * @param {string} arg_channel - channel name string.
	 * @param {function} arg_handler - f(payload):nothing.
	 * @param {function} arg_predicate - p(payload):boolean (optional).
	 * 
	 * @returns {function} - unsubscribe function.
	 */
	/* eslint no-unused-vars: "off" */
	channel_on(arg_channel, arg_handler, arg_predicate=undefined)
	{
		assert( T.isString(arg_channel), this.get_context() + ':channel_on:bad channel name')
		assert( T.isFunction(arg_handler), this.get_context() + ':channel_on:bad handler function')
		throw new Error('channel_on:Not yet implemented')
	}



	/**
	 * Transform payload of input channel to output channel.
	 * 
	 * @param {string} arg_in_channel - input channel name.
	 * @param {string} arg_out_channel - output channel name.
	 * @param {function} arg_handler - payload tranform function.
	 * 
	 * @returns {nothing}
	 */
	channel_transform(arg_in_channel, arg_out_channel, arg_handler)
	{
		assert( T.isString(arg_in_channel), this.get_context() + ':channel_transform:bad input channel name')
		assert( T.isString(arg_out_channel), this.get_context() + ':channel_transform:bad output channel name')
		assert( T.isFunction(arg_handler), this.get_context() + ':channel_transform:bad transform function')
		const handler = (payload)=>{
			const xform_payload = arg_handler(payload)
			this.channel_send(arg_out_channel, xform_payload)
		}
		this.channel_on(arg_in_channel, handler)
	}
}