Reference Source

js/messaging/message_bus.js

// NPM IMPORTS
import assert from 'assert'
import {format} from 'util'

// COMMON IMPORTS
import T                  from '../utils/types'
import Instance           from '../base/instance'
import DistributedMessage from '../base/distributed_message'
import DistributedLogs    from '../base/distributed_logs'
import DistributedMetrics from '../base/distributed_metrics'
import Stream             from './stream'


const context = 'common/messaging/message_bus'
const MAX_PAGE_SIZE = 999999
const MIN_PAGE_SIZE = 9



/**
 * @file Message bus class.
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 */
export default class MessageBus extends Instance
{
	/**
	 * Create a bus.
	 * 
	 * API:
	 *   ->constructor(arg_name, arg_settings, arg_log_context).
	 * 
	 * 	Engine API:
	 *   ->get_bus_engine():BusEngine
	 * 
	 *  Stream API:	
	 *   ->get_input_stream():Stream - get input stream to populate the bus.
	 * 
	 * 	Message API:
	 * 	 ->msg_recipients(arg_page_size=999999, arg_page_index=0):array - get paged recipients list.
	 *   ->msg_post(arg_msg:DistributedMessage):boolean - send a DistributedMessage instance.
	 * 
	 *   ->msg_subscribe(arg_channel:string, arg_handler:f(msg), arg_filter:string|object):unsubscribe function - subscribe to messages of the bus.
	 * 
	 *   ->msg_register(arg_distributed_instance,arg_channel,arg_method='receive_msg'):function - register a DistributedInstance recipient.
	 *   ->msg_has_recipient(arg_name):boolean - test if the bus has given named recipient.
	 *   ->msg_add_recipient(arg_name, arg_instance='remote') - add a bus recipient.
	 *   ->msg_remove_recipient(arg_name) - remove a bus recipient.
	 * 
	 *   ->normalize_msg(arg_msg):DistributedMessage|undefined - Normalize given message.
	 * 
	 * @param {string}    arg_name - instance name.
	 * @param {BusEngine} arg_bus_engine - bus engine.
	 * @param {object}    arg_settings - settings.
	 * @param {string}    arg_log_context - trace context.
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_name, arg_bus_engine, arg_settings, arg_log_context)
	{
		super('buses', 'MessageBus', arg_name, arg_settings, arg_log_context ? arg_log_context : context)
		
		/**
		 * Class type flag.
		 * @type {boolean}
		 */
		this.is_message_bus = true

		this._bus_engine = arg_bus_engine

		this._input_stream = new Stream()

		this._recipients = {}
		this._recipients_handlers = {}
	}
	

	
	/**
	 * Get bus engine.
	 * 
	 * @returns {BusEngine}
	 */
	get_bus_engine()
	{
		assert( T.isObject(this._bus_engine) && this._bus_engine.is_bus_engine, this.get_context() + ':get_bus_engine:bad bus engine instance')
		return this._bus_engine
	}
	
	
	
	/**
	 * Get stream to populate the bus.
	 * 
	 * @returns {Stream} - input bus stream.
	 */
	get_input_stream()
	{
		return this._input_stream
	}



	/**
	 * Get paged recipients list.
	 * 
	 * @param {integer} arg_page_size  - recipients list page size.
	 * @param {integer} arg_page_index - recipients list page index.
	 * 
	 * @returns {object} - paged result { count:recipients count, page_size:..., page_count:..., page_index:..., page_values:[] }
	 */
	msg_recipients(arg_page_size=MAX_PAGE_SIZE, arg_page_index=0)
	{
		const recipients = Object.keys(this._recipients)
		const count = recipients.length
		const page  = T.isNumber(arg_page_size) && arg_page_size > MIN_PAGE_SIZE && arg_page_size < MAX_PAGE_SIZE ? arg_page_size : MAX_PAGE_SIZE
		const index = T.isNumber(arg_page_index) && arg_page_index >= 0 ? arg_page_index : 0

		const values = count <= page ? recipients : recipients.slice(index * page, page)
		return {
			count:count,
			page_size:page,
			page_count:Math.floor(count / page) + ( (count - Math.floor(count / page)) > 0 ? 1 : 0),
			page_index:index,
			page_values:values
		}
	}



	/**
	 * Send a message into a channel.
	 * 
	 * @param {DistributedMessage} arg_msg - distributed message instance.
	 * 
	 * @returns {boolean} - success or failure.
	 */
	msg_post(arg_msg)
	{
		try{
			assert( T.isObject(arg_msg) && arg_msg.is_distributed_message, this.get_context() + ':msg_post:bad message instance')
			assert( T.isObject(this._bus_engine) && this._bus_engine.is_bus_engine, this.get_context() + ':msg_post:bad message instance')
			
			const channel = arg_msg.get_channel()
			assert( T.isString(channel), this.get_context() + ':msg_post:bad channel name')

			// ADD BUS NAME TO MESSAGE PATH
			const step_name = this.get_name()
			if (arg_msg.has_buses_step(step_name))
			{
				// DEBUG
				const payload_str = JSON.stringify(arg_msg.get_payload())
				this.debug( format('msg_post:bus step exists:engine=[%s] step=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), step_name, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str) )
				// console.log(context + ':msg_post:bus step exists:engine=[%s] step=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), step_name, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str)
				
				return false
			}
			arg_msg.add_buses_step(step_name)

			// DEBUG
			const payload_str = JSON.stringify(arg_msg.get_payload())
			this.debug( format('msg_post:new bus step:engine=[%s] step=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), step_name, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str) )
			// console.log(context + ':msg_post:new bus step:engine=[%s] step=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), step_name, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str)
			
			// LOCAL RECIPIENT
			const recipient_is_local = this.msg_has_recipient(arg_msg.get_target())
			if (recipient_is_local)
			{
				const recipient = this._recipients[arg_msg.get_target()]
				if ( T.isObject(recipient) || recipient == 'browser' )
				{
					this.debug( format('msg_post:recipient is local and is an instance or browser:engine=[%s] step=[%s] channel=[%s] sender=[%s] target=[%s]', this.get_name(), step_name, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target() ) )
					const handler = this._recipients_handlers[arg_msg.get_target()]
					if ( T.isFunction(handler) )
					{
						handler(arg_msg)
						return true
					}
					
					console.warn(context + ':msg_post:failure with error:not or bad handler for local recipient')
					return false
				}

				this.debug( format('msg_post:recipient is local but not an instance:engine=[%s] step=[%s] channel=[%s] sender=[%s] target=[%s]', this.get_name(), step_name, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target() ) )
			}

			// REMOTE RECIPIENT
			this._bus_engine.channel_send(channel, arg_msg)

			return true
		} catch(e) {
			console.warn(context + ':msg_post:failure with error:' + e)
			return false
		}
	}
	
	
	
	/**
	 * Subscribe on channel inputs.
	 * 
	 * @param {string} arg_channel - channel name string.
	 * @param {function} arg_handler - f(payload)
	 * @param {string|function} arg_filter - filter string or function.
	 * 
	 * @returns {function} - unsubscribe function.
	 */
	msg_subscribe(arg_channel, arg_handler, arg_filter=undefined)
	{
		assert( T.isFunction(arg_handler), this.get_context() + ':msg_subscribe:bad handler function')
		assert( T.isObject(this._bus_engine) && this._bus_engine.is_bus_engine, this.get_context() + ':msg_subscribe:bad bus engine instance')
		
		// DEBUG
		// debugger
		this.debug( format('msg_subscribe:engine=[%s] filter=[%s] channel=[%s]', this.get_name(), arg_filter, arg_channel) )
		// console.log(context + ':msg_subscribe:same channel:engine=[%s] filter=[%s] channel=[%s]', this.get_name(), arg_filter, arg_channel)

		// REGISTER RECIPIENT
		if ( T.isString(arg_filter) )
		{
			this.msg_add_recipient(arg_filter, undefined)
		}
		
		// MESSAGE HANDLER
		const msg_handler = (arg_msg)=>{
			arg_msg = this.normalize_msg(arg_msg)

			// DEBUG
			// debugger

			// CHANNEL NAME FILTERING
			const channel = arg_msg.get_channel()
			assert( T.isNotEmptyString(channel), this.get_context() + ':msg_subscribe:msg_handler:bad channel name')
			const same_channel = channel == arg_channel

			// DEBUG
			this.debug( format('msg_subscribe:same channel:engine=[%s] filter=[%s] channel=[%s] handler channel=[%s] same channel[%d]', this.get_name(), arg_filter, channel, arg_channel, same_channel) )
			// console.log(context + ':msg_subscribe:same channel:engine=[%s] filter=[%s] channel=[%s] handler channel=[%s] same channel[%d]', this.get_name(), arg_filter, channel, arg_channel, same_channel)
				
			if (! same_channel)
			{
				// DEBUG
				this.debug( format('msg_subscribe:skip channel:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] handler channel=[%s]', this.get_name(), arg_filter, channel, arg_msg.get_sender(), arg_msg.get_target(), arg_channel) )
				// console.log(context + ':msg_subscribe:skip channel:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] handler channel=[%s]', this.get_name(), arg_filter, channel, arg_msg.get_sender(), arg_msg.get_target(), arg_channel)
				return
			}

			// TARGET NAME FILTERING
			if ( T.isNotEmptyString(arg_filter) && arg_filter != arg_msg.get_target() )
			{
				// DEBUG
				this.debug( format('msg_subscribe:skip target:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] handler channel=[%s]', this.get_name(), arg_filter, channel, arg_msg.get_sender(), arg_msg.get_target(), arg_channel) )
				// console.log(context + ':msg_subscribe:skip target:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] handler channel=[%s]', this.get_name(), arg_filter, channel, arg_msg.get_sender(), arg_msg.get_target(), arg_channel)
				return
			}
			if ( this.msg_has_recipient( arg_msg.get_target() ) )
			{
				// DEBUG
				const payload_str = JSON.stringify(arg_msg.get_payload())
				this.debug( format('msg_subscribe:msg handled with filter:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), arg_filter, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str) )
				// console.log(context + ':msg_subscribe:msg handled with filter:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), arg_filter, arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str)
				
				arg_handler(arg_msg)
				return
			}

			// NO FILTERING
			// DEBUG
			const payload_str = JSON.stringify(arg_msg.get_payload())
			this.debug( format('msg_subscribe:msg handled without filter:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), '', arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str) )
			// console.log(context + ':msg_subscribe:msg handled without filter:engine=[%s] filter=[%s] channel=[%s] sender=[%s] target=[%s] data=[%s]', this.get_name(), '', arg_msg.get_channel(), arg_msg.get_sender(), arg_msg.get_target(), payload_str)
			
			arg_handler(arg_msg)
		}
		
		if ( T.isString(arg_filter) && arg_filter in this._recipients_handlers )
		{
			this._recipients_handlers[arg_filter] = msg_handler
		}

		const fn_filter = T.isFunction(arg_filter) ? arg_filter : undefined
		const str_filter = T.isString(arg_filter) ? (msg)=>{ return arg_filter == msg._target } : undefined
		const predicate = fn_filter || str_filter
		return this._bus_engine.channel_on(arg_channel, msg_handler, predicate)
	}



	/**
	 * Register a DistributedInstance recipient.
	 * 
	 * @param {DistributedInstance} arg_distributed_instance - distributed recipient instance.
	 * @param {string} arg_channel - channel name string.
	 * @param {string} arg_method  - message handler method name string.
	 * @param {string} arg_alias   - instance alias name string (optional, default undefined).
	 * 
	 * @returns {function} - unsubscribe function.
	 */
	msg_register(arg_distributed_instance, arg_channel, arg_method='receive_msg', arg_alias=undefined)
	{
		assert( T.isObject(arg_distributed_instance) && arg_distributed_instance.is_distributed_instance, this.get_context() + ':msg_register:bad distributed instance.')
		assert( T.isString(arg_method) && (arg_method in arg_distributed_instance), this.get_context() + ':msg_register:bad method name [' + arg_method + '].')
		
		const name = T.isNotEmptyString(arg_alias) ? arg_alias : arg_distributed_instance.get_name()
		const handler = arg_distributed_instance[arg_method].bind(arg_distributed_instance)

		// DEBUG
		this.debug( format('msg_register:engine=[%s] instance=[%s] channel=[%s] method=[%s]', this.get_name(), name, arg_channel, arg_method) )
		// console.log(context + ':msg_register:engine=[%s] instance=[%s] channel=[%s] method=[%s]', this.get_name(), name, arg_channel, arg_method)

		this.msg_add_recipient(name, arg_distributed_instance)
		
		return this.msg_subscribe(arg_channel, handler, name)
	}



	/**
	 * Test if bus has given named recipient.
	 * 
	 * @param {arg_name} arg_name - recipient name.
	 * 
	 * @returns {boolean}
	 */
	msg_has_recipient(arg_name)
	{
		return T.isString(arg_name) && (arg_name in this._recipients)
	}


	/**
	 * Add a recipient.
	 * 
	 * @param {string} arg_name - recipient name.
	 * @param {DistributedInstance} arg_instance - distributed recipient instance (default:'remote').
	 * 
	 * @returns {nothing}
	 */
	msg_add_recipient(arg_name, arg_instance='remote')
	{
		assert( T.isNotEmptyString(arg_name), this.get_context() + ':msg_add_recipient:bad recipient name [' + arg_name + '].')
		assert( arg_instance=='browser' || arg_instance=='remote' || T.isObject(arg_instance) && arg_instance.is_distributed_instance, this.get_context() + ':msg_add_recipient:bad distributed instance.')
		
		if (arg_name in this._recipients)
		{
			return
		}

		this._recipients[arg_name] = arg_instance
		this._recipients_handlers[arg_name] = ()=>{}
	}


	/**
	 * Remove a recipient.
	 * 
	 * @param {string} arg_name - recipient name.
	 * 
	 * @returns {nothing}
	 */
	msg_remove_recipient(arg_name)
	{
		assert( T.isNotEmptyString(arg_name), this.get_context() + ':msg_add_recipient:bad recipient name [' + arg_name + '].')
		
		if (arg_name in this._recipients)
		{
			delete this._recipients[arg_name]
			delete this._recipients_handlers[arg_name]
		}
	}


	/**
	 * Normalize given message.
	 * 
	 * @param {object} arg_msg - message to normalize.
	 * 
	 * @returns {DistributedMessage|undefined} - normalized message.
	 */
	normalize_msg(arg_msg)
	{
		if (! T.isObject(arg_msg) || ! arg_msg.is_distributed_message)
		{
			return undefined
		}

		if ( T.isFunction(arg_msg.get_channel) )
		{
			return arg_msg
		}

		if ( arg_msg.is_distributed_logs)
		{
			return new DistributedLogs(arg_msg)
		}

		if ( arg_msg.is_distributed_metrics)
		{
			return new DistributedMetrics(arg_msg)
		}

		return new DistributedMessage(arg_msg)
	}
}