Reference Source

js/nodes/bus_node_feature.js

// NPM IMPORTS
import { fromJS } from 'immutable'

// COMMON IMPORTS
import StreamBusEngine   from 'devapt-core-common/dist/js/messaging/stream_bus_engine'
import SocketIOBusEngine from 'devapt-core-common/dist/js/messaging/socketio_bus_engine'
import MessageBus        from 'devapt-core-common/dist/js/messaging/message_bus'

// SERVER IMPORTS
import NodeFeature from './node_feature'



/**
 * Contextual constant for this file logs.
 * @private
 * @type {string}
 */
const context = 'server/nodes/bus_node_feature'



/**
 * Node feature: manages a bus.
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 * 
 * @example
 * 	Bus engine format:
 * 		{
 * 			package:'default' or 'my bus engine NPM package',
 * 			type:'Server' or 'Client',
 * 			protocole:'http' or 'https,
 * 			host:'...',
 * 			port:'...',
 * 			settings:{} // optional
 * 		}
 */
export default class BusNodeFeature extends NodeFeature
{
	/**
	 * Create a BusNodefeature instance.
	 * 
	 * @param {Node} arg_node - node instance.
	 * @param {string} arg_name - feature name.
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_node, arg_name)
	{
		super(arg_node, arg_name)
		
		/**
		 * Class type flag.
		 * @type {boolean}
		 */
		this.is_bus_node_feature = true
		
		/**
		 * Feature bus instance.
		 * @type {MessageBus}
		 */
		this.bus = undefined
	}


	/**
	 * Get bus name.
	 * 
	 * @returns {string} - unique bus name: node name + '_' + feature bus name
	 */
	get_bus_unique_name()
	{
		return this.node.get_name() + '_' + this.get_name()
	}



	/**
	 * Create bus from settings.
	 * 
	 * @returns {MessageBus} - created bus messages.
	 */
	create_message_bus()
	{
		const default_bus_package = 'default'
		const default_bus_type = this.node.is_master ? 'Server' : 'Client'
		const default_bus_protocole = 'https'

		const bus_name = this.get_name()
		const bus_pkg  = this.node.get_setting(['master', bus_name, 'package'], default_bus_package)
		
		let engine_settings = this.node.get_setting(['master', bus_name, 'settings'], fromJS({}) )
		engine_settings = engine_settings.set('runtime',   this.node.get_runtime())
		engine_settings = engine_settings.set('type',      this.node.get_setting(['master', bus_name, 'type'], default_bus_type) )
		engine_settings = engine_settings.set('protocole', this.node.get_setting(['master', bus_name, 'protocole'], default_bus_protocole) )
		engine_settings = engine_settings.set('host',      this.node.get_setting(['master', bus_name, 'host'], undefined) )
		engine_settings = engine_settings.set('port',      this.node.get_setting(['master', bus_name, 'port'], undefined) )

		let bus_engine = undefined
		if (bus_pkg == 'default')
		{
			bus_engine = new StreamBusEngine(bus_name + '_engine', engine_settings, context, this.node.get_logger_manager())
		} else if (bus_pkg == 'socketio')
		{
			bus_engine = new SocketIOBusEngine(bus_name + '_engine', engine_settings, context, this.node.get_logger_manager())
		} else {
			bus_engine = this.create_bus_engine(bus_pkg, bus_name + '_engine', engine_settings)
		}

		// CREATE MESSAGES BUS FOR INTRA NODES COMMUNICATION
		const bus_settings = { runtime:this.node._runtime, logger_manager:this.node.get_logger_manager() }
		
		return new MessageBus(this.get_bus_unique_name(), bus_engine, bus_settings, context)
	}



	/**
	 * Create a bus engine.
	 * 
	 * @param {string} arg_package  - class package name.
	 * @param {string} arg_name - bus engine name.
	 * @param {object} arg_settings - bus engine settings.
	 * 
	 * @returns {BusEngine} - Bus engine instance
	 */
	create_bus_engine(arg_package, arg_name, arg_settings)
	{
		try
		{
			// GET PACKAGE
			const pkg_path = this.node.get_runtime().get_context().get_absolute_package_path(arg_package)
			// console.log(context + ':create_bus_engine:pkg_path=[%s]', pkg_path)
			
			let pkg = require(pkg_path)
			if (! pkg)
			{
				return undefined
			}
			if (pkg.default)
			{
				pkg = pkg.default
			}

			// CHECK TYPE
			if (! ('BusEngine' in pkg) )
			{
				return undefined
			}

			// GET CLASS
			const pkg_class = pkg['BusEngine']
			if (! pkg_class)
			{
				return undefined
			}

			return new pkg_class(arg_name, arg_settings, context, this.node.get_logger_manager())
		}
		catch(e)
		{
			console.warn(context + ':create_bus_engine:bad package name for bus engine:' + arg_package + ' with error:' + e.toString())
		}

		return undefined
	}
	

	
	/**
	 * Load Node settings.
	 * 
	 * @returns {nothing}
	 */
	load()
	{
		// const self = this
		this.node.enter_group(':BusNodeFeature.load()')
		
		if (this.bus)
		{
			this.node.leave_group(':BusNodeFeature.load():already loaded')
			return
		}

		super.load()
		
		this.bus = this.create_message_bus()

		// DEBUG
		// console.log(context + ':load:name=%s this.bus', this.get_name(), this.bus.get_name())

		this.node.leave_group(':BusNodeFeature.load()')
	}
	
	
	
	/**
	 * Starts node bus.
	 * 
	 * @returns {nothing}
	 */
	start()
	{
		this.node.enter_group(':BusNodeFeature.start')
		
		
		this.node.leave_group(':BusNodeFeature.start')
	}
	
	
	
	/**
	 * Stops node bus.
	 * 
	 * @returns {nothing}
	 */
	stop()
	{
		this.node.enter_group(':BusNodeFeature.stop')
		
		
		this.node.leave_group(':BusNodeFeature.stop')
	}
}