Reference Source

js/nodes/node.js

// NPM IMPORTS
import assert from 'assert'

// COMMON IMPORTS
import T                  from 'devapt-core-common/dist/js/utils/types'
import ServiceResponse    from 'devapt-core-common/dist/js/services/service_response'

// SERVER IMPORTS
import runtime            from '../base/runtime'
import NodeMessaging      from './node_messaging'
import MetricsNodeFeature from './metrics_node_feature'
import ServersNodeFeature from './servers_node_feature'
import BusNodeFeature     from './bus_node_feature'



let context = 'server/nodes/node'


const STATE_CREATED = 'NODE_IS_CREATED'
// const STATE_REGISTERING = 'NODE_IS_REGISTERING_TO_MASTER'
// const STATE_WAITING = 'NODE_IS_WAITING_ITS_SETTINGS'
const STATE_LOADING = 'NODE_IS_LOADING_ITS_SETTINGS'
const STATE_LOADED = 'NODE_HAS_LOADED_ITS_SETTINGS'
const STATE_STARTING = 'NODE_IS_STARTING'
const STATE_STARTED = 'NODE_IS_STARTED'
const STATE_STOPPING = 'NODE_IS_STOPPING'
const STATE_STOPPED = 'NODE_IS_STOPPED'
// const STATE_UNREGISTERING = 'NODE_IS_UNREGISTERING_TO_MASTER'



/**
 * Node class: node is an item of a topology and manages a set of servers.
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 */
export default class Node extends NodeMessaging
{
	/**
	 * Create a Node instance.
	 * 
	 * @param {string} arg_name - resource name.
	 * @param {object} arg_settings - resource settings.
	 * @param {string} arg_log_context - trace context string (optional, default=context).
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_name, arg_settings, arg_log_context=context)
	{
		assert( T.isObject(arg_settings.runtime), arg_log_context + ':bad runtime instance')
		assert( T.isObject(arg_settings), arg_log_context + ':bad settings object')
		
		super(arg_name, arg_settings, arg_log_context)
		
		/**
		 * Class type flag.
		 * @type {boolean}
		 */
		this.is_node = true
		
		// CREATE NODE FEATURES
		this.servers_feature     = new ServersNodeFeature(this, 'servers')
		this.msg_bus_feature     = new BusNodeFeature(this, 'msg_bus')
		this.logs_bus_feature    = new BusNodeFeature(this, 'logs_bus')
		this.metrics_bus_feature = new BusNodeFeature(this, 'metrics_bus')
		this.metrics_feature     = undefined

		this.features = []
		this.features.push(this.servers_feature)
		
		this.remote_nodes = {}

		this.switch_state(STATE_CREATED)
	}
	
	
	
	/**
	 * Load Node with starting settings.
	 * 
	 * @returns {nothing}
	 */
	load()
	{
		this.enter_group('load()')
		
		// console.log(context + ':load:Node')

		// LOAD BUSES FEATURES BEFORE LOADING DISTRIBUTED INSTANCE PARENT CLASS
		this.msg_bus_feature.load()
		this.logs_bus_feature.load()
		this.metrics_bus_feature.load()

		super.load()
		
		this.msg_bus = this.msg_bus_feature.bus
		this.metrics_bus = this.metrics_bus_feature.bus
		this.logs_bus = this.logs_bus_feature.bus

		this.msg_bus.get_bus_engine().channel_add('default')
		this.metrics_bus.get_bus_engine().channel_add('metrics')
		this.logs_bus.get_bus_engine().channel_add('logs')

		this.enable_on_bus(this.msg_bus, 'default', 'receive_msg')

		if (this.is_master)
		{
			this.enable_on_bus(this.msg_bus, 'default', 'receive_msg', 'master')
		}
	
		// DEBUG
		// if (! this.is_master)
		// {
		// 	this.msg_bus.get_bus_engine().channel_on('default',
		// 		(payload)=>{
		// 			console.log('MESSAGE HANDLER', payload)
		// 		}
		// 	)
		// }

		this.leave_group('load()')
	}

	
	// TODO API
	update_trace_enabled_stage_1()
	{
		this.msg_bus.update_trace_enabled()
		this.msg_bus.get_bus_engine().update_trace_enabled()
		
		this.metrics_bus.update_trace_enabled()
		this.metrics_bus.get_bus_engine().update_trace_enabled()
		
		this.logs_bus.update_trace_enabled()
		this.logs_bus.get_bus_engine().update_trace_enabled()
	}

	// TODO API
	update_trace_enabled_stage_2()
	{
		if (this.is_master && this.metrics_feature)
		{
			this.metrics_feature.get_metrics_server().update_trace_enabled()
		}
	}
	
	
	
	/**
	 * Load Node features.
	 * 
	 * @returns {nothing}
	 */
	load_features()
	{
		// UODATE TRACE FLAG
		this.update_trace_enabled()
		
		const self = this

		// DEBUG
		// this.enable_trace()

		this.enter_group('load_features()')
		
		// CREATE MASTER INIT
		if (this.is_master)
		{
			this.metrics_feature = new MetricsNodeFeature(this, 'metrics')
			this.metrics_feature.load()
		}

		// LOAD EXISTING FEATURES
		this.features.forEach(
			(feature) => {
				self.info('Loading feature [' + feature.get_name() + ']')
				
				feature.load()
				
				self.info('Feature is loaded [' + feature.get_name() + ']')
			}
		)

		if (this.is_master)
		{
			this.features.push(this.metrics_feature)
		}

		this.leave_group('load_features()')
	}
	
	
	
	/**
	 * Load settings on master node.
	 * 
	 * @param {object} arg_settings - master node settings.
	 * 
	 * @returns {nothing}
	 */
	load_topology_settings(arg_settings)
	{
		// UODATE TRACE FLAG
		this.update_trace_enabled()

		this.enter_group('load_topology_settings')
		this.switch_state(STATE_LOADING)

		// console.log(arg_settings, 'node.loading_master_settings:arg_settings')
		
		// CHECK NODE SERVERS SETTINGS
		assert( T.isObject(arg_settings), context + ':load_topology_settings:bad master settings object')
		assert( T.isFunction(arg_settings.has), context + ':load_topology_settings:bad settings object')
		assert( arg_settings.has('servers'), context + ':load_topology_settings:unknow settings.servers')
		
		// GET NODE SERVERS SETTINGS
		let servers = arg_settings.get('servers')
		assert( T.isObject(servers), context + ':load_topology_settings:bad settings.servers object')
		const bindings = runtime.get_setting('servers_bindings', undefined)
		if (bindings)
		{
			const bindings_js = bindings.toJS()

			// DEBUG
			// console.log(context + ':load_topology_settings:settings.servers_bindings', bindings_js)
			assert( T.isArray(bindings_js), context + ':load_topology_settings:bad settings.servers_bindings array')
			
			bindings_js.forEach(
				(binding_record) => {
					if (binding_record.node == this.get_name() && servers.has(binding_record.server) )
					{
						servers = servers.setIn([binding_record.server, 'host'], binding_record.host )
						servers = servers.setIn([binding_record.server, 'port'], binding_record.port )
					}
				}
			)
		}

		// UDPATE NODE SETTINGS WITH SERVERS
		this.$settings = this.$settings.set('servers', servers)
		
		// LOAD NODE FEATURES (servers)
		this.load_features()
		
		this.switch_state(STATE_LOADED)
		this.leave_group('load_topology_settings')
	}



	/**
	 * TODO get_metrics_server_name
	 */
	get_metrics_server_name()
	{
		return 'metrics_server'
	}

	
	
	/**
	 * Get metrics server instance.
	 * 
	 * @returns {Server} - Metrics server.
	 */
	get_metrics_server()
	{
		if ( ! this.metrics_feature)
		{
			return undefined
		}
		
		const srv = this.metrics_feature.get_metrics_server()
		assert( T.isObject(srv), context + ':get_metrics_server:bad metrics_server object')
		return srv
	}
	
	
	
	/**
	 * Get Node instance servers collection.
	 * 
	 * @returns {Collection} - Node servers.
	 */
	get_servers()
	{
		const srv = this.servers_feature.servers
		assert( T.isObject(srv), context + ':get_servers:bad servers object')
		return srv
	}
	
	
	
	/**
	 * Process received message.
	 * 
	 * @param {DistributedMessage} arg_msg - message object.
	 * 
	 * @returns {nothing}
	 */
	receive_msg(arg_msg)
	{
		this.enter_group('receive_msg')

		// DEBUG
		// console.log(context + ':receive_msg:from=%s to=%s', arg_msg.get_sender(), arg_msg.get_target())

		// DO NOT PROCESS MESSAGES FROM SELF
		if (arg_msg.get_sender() == this.get_name())
		{
			return
		}

        // CHECK SENDER
		assert( T.isString(arg_msg.get_sender()), context + ':receive_msg:bad sender string')
		this.info('receiving a message from ' + arg_msg.get_sender())
		
        // CHECK PAYLOAD
		const payload = arg_msg.get_payload()
		assert( T.isObject(payload), context + ':receive_msg:bad payload object')

        // DEBUG
		// console.log(context + ':receive_msg:sender=[%s] payload=[%s]', arg_msg.get_sender(), JSON.stringify(payload) )

		// PINGPONG FEATURE
		if (payload.service == 'pingpong' && payload.operation == 'devapt-ping')
		{
			const response = new ServiceResponse(payload)
			response.set_results(['devapt-pong', this.get_name()])
			
			// DEBUG
			// console.log(context + ':receive_msg:reply pong:', response)

			this.send_msg(arg_msg.get_sender(), response.get_properties_values())

			this.leave_group('receive_msg')
			return
		}

		if (this.is_master)
		{
			this.receive_master_msg(arg_msg.get_sender(), payload)
		}
		else
		{
			this.receive_node_msg(arg_msg.get_sender(), payload)
		}

		this.leave_group('receive_msg')
	}
	

	
	/**
	 * Process received message on master node.
	 * 
	 * @param {string} arg_sender - message emitter name.
	 * @param {object} arg_payload - message content.
	 * 
	 * @returns {nothing}
	 */
	receive_master_msg(arg_sender, arg_payload)
	{
		this.enter_group('receive_master_msg')
        
		// CHECK ACTION
		if ( ! T.isString(arg_payload.action) )
		{
			this.warn('receive_master_msg:bad payload.action string received from ' + arg_sender)
			return
		}
		if ( ! T.isObject(arg_payload.node) )
		{
			this.warn('receive_master_msg:bad payload.node object received from ' + arg_sender)
			return
		}
		if ( ! T.isString(arg_payload.node.name) )
		{
			this.warn('receive_master_msg:bad payload.node.name string received from ' + arg_sender)
			return
		}

		// REGISTER A NEW NODE ON MASTER TOPOLOGY
		if (arg_payload.action == 'NODE_ACTION_REGISTERING' && this.get_name() != arg_payload.node.name)
		{
			this.msg_bus.msg_add_recipient(arg_sender, 'remote')

			const cfg = runtime.get_registry().initial_config
			const response_msg = {
				action:'NODE_ACTION_REGISTERING',
				master:this.get_name(),
				settings:cfg
			}

			// DEBUG
			// console.log(context + ':receive_master_msg:send response', response_msg)

			this.remote_nodes[arg_payload.node.name] = arg_payload.node
			this.send_msg(arg_sender, response_msg)
		}

		this.leave_group('receive_master_msg')
	}
	

	
	/**
	 * Process received message not on master node.
	 * 
	 * @param {string} arg_sender - message emitter name.
	 * @param {object} arg_payload - message content.
	 * 
	 * @returns {nothing}
	 */
	receive_node_msg(arg_sender, arg_payload)
	{
		this.enter_group('receive_node_msg')
        
		// DEBUG
		// console.log(context + ':receive_node_msg:send response from %s', arg_sender, arg_payload)

		if (arg_payload.action == 'NODE_ACTION_REGISTERING' && T.isObject(arg_payload.settings))
		{
			// console.log(context + ':receive_node_msg:load_topology_settings')
			const settings = arg_payload.settings
			if ( T.isFunction(this.on_registering_callback) )
			{
				this.on_registering_callback(settings)
				this.leave_group('receive_node_msg:call on_registering_callback')
				return
			}
			this.load_topology_settings(settings)
		}

		this.leave_group('receive_node_msg')
	}
	
	
	
	/**
	 * Starts node features.
	 * 
	 * @returns {nothing}
	 */
	start()
	{
		const self = this
		this.enter_group('start')
		this.switch_state(STATE_STARTING)
		
		this.features.forEach(
			(feature) => {
				self.info('Starting feature [' + feature.get_name() + ']')
				
				feature.start()

				self.info('Feature is started [' + feature.get_name() + ']')
			}
		)
		
		this.switch_state(STATE_STARTED)
		this.leave_group('start')
	}
	
	
	
	/**
	 * Stops node features.
	 * 
	 * @returns {nothing}
	 */
	stop()
	{
		const self = this
		this.enter_group('stop')
		this.switch_state(STATE_STOPPING)
		
		this.features.forEach(
			(feature) => {
				self.info('Stopping feature [' + feature.get_name() + ']')
				
				feature.stop()
				
				self.info('Feature is stopped [' + feature.get_name() + ']')
			}
		)
		
		this.switch_state(STATE_STOPPED)
		this.leave_group('stop')
	}



	/**
	 * Get topology item informations.
	 * 
	 * @param {boolean} arg_deep - get deep sub items information on true (default:false).
	 * @param {object} arg_visited - visited items plain object map.
	 * 
	 * @returns {object} - topology informations (plain object).
	 */
	get_topology_info(arg_deep=true, arg_visited={})
	{
		const info = {
			name:this.get_name(),
			uid_desc:'N/A',
			uid:'N/A',

			tenant:'N/A',
			package:'N/A',
			version:'N/A',
			
			type:'node',
			security:'N/A',
			
			children:['N/A']
		}

		if ( arg_visited && (this.topology_uid in arg_visited) )
		{
			return Object.assign(info, { note:'already dumped' } )
		}

		arg_visited[this.topology_uid] = info

		return info
	}
}