Reference Source

js/services/messages/messages_svc_provider.js

// NPM IMPORTS
import _ from 'lodash'

// COMMON IMPORTS
import T                  from 'devapt-core-common/dist/js/utils/types'
import ServiceProvider    from 'devapt-core-common/dist/js/services/service_provider'
import ServiceResponse    from 'devapt-core-common/dist/js/services/service_response'
import DistributedMessage from 'devapt-core-common/dist/js/base/distributed_message'
import DistributedLogs    from 'devapt-core-common/dist/js/base/distributed_logs'
import DistributedMetrics from 'devapt-core-common/dist/js/base/distributed_metrics'

// SERVICES IMPORTS


/**
 * Contextual constant for this file logs.
 * @private
 */
const context = 'services/messages/messages_svc_provider'



/**
 * Messages service provider class.
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 * 
 * @example
* 	API:
* 		this._msg_subscriptions = {
* 			sender name:{
* 				bus name:{
* 					channel name:{
* 						socket: server/browser socket,
* 						unsubscribe: function
* 					}
* 				}
* 			}
* 		}
 */
export default class MessagesSvcProvider extends ServiceProvider
{
	/**
	 * Create a messages gateway service provider.
	 * 
	 * @param {string} arg_provider_name - consumer name.
	 * @param {Service} arg_service_instance - service instance.
	 * @param {string} arg_context - logging context label.
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_provider_name, arg_service_instance, arg_context=context)
	{
		super(arg_provider_name, arg_service_instance, arg_context)
		
		/**
		 * Class test flag.
		 * @type {boolean}
		 */
		this.is_messages_svc_provider = true

		/**
		 * Messages subscriptions.
		 * @private
		 * @type {object}
		 */
		this._msg_subscriptions = {}
	}



	/**
	 * Get provider operations names.
	 * @abstract
	 * 
	 * @returns {array}
	 */
	get_operations_names()
	{
		return [
			'devapt-msg-describe', 'devapt-msg-recipients',
			'devapt-msg-send',
			'devapt-msg-subscribe', 'devapt-msg-unsubscribe', 'devapt-msg-subscription'
		].concat( super.get_operations_names() )
	}


	
	/**
	 * Produce service datas on request.
	 * 
	 * @param {ServiceRequest} arg_request - service request instance.
	 * 
	 * @returns {Promise} - promise of ServiceResponse instance.
	 */
	produce(arg_request)
	{
		this.enter_group('produce')

		if ( ! T.isObject(arg_request) || ! arg_request.is_service_request)
		{
			this.leave_group('produce:error:bad request object.')
			return Promise.resolve({error:'bad request object'})
		}

		const response = new ServiceResponse(arg_request)
		const operation = arg_request.get_operation()
		const operands = arg_request.get_operands()

		// DEBUG
		this.debug('produce:request for service=' + this.service.get_name() + ':operation=' + operation)
		// console.log(context + ':produce:request for service=' + this.service.get_name() + ':operation=' + operation)

		// GET BUSES
		const node = this.get_runtime().get_node()
		const msg_bus = node.get_msg_bus()
		const logs_bus = node.get_logs_bus()
		const metrics_bus = node.get_metrics_bus()
		const msg_engine = msg_bus.get_bus_engine()
		const logs_engine = logs_bus.get_bus_engine()
		const metrics_engine = metrics_bus.get_bus_engine()


		if (operation == 'devapt-msg-recipients')
		{
			const bus        = operands.length > 0 ? operands[0] : undefined
			const page_size  = operands.length > 1 ? operands[1] : 99
			const page_index = operands.length > 2 ? operands[2] : 0

			if (bus == 'messages')
			{
				const paged_result = msg_bus.msg_recipients(page_size, page_index)
				response.set_results(paged_result)
				this.leave_group('produce:operation[' + operation + '] for bus [' + bus + ']')
				return Promise.resolve(response)
			}

			if (bus == 'logs')
			{
				const paged_result = logs_bus.msg_recipients(page_size, page_index)
				response.set_results(paged_result)
				this.leave_group('produce:operation[' + operation + '] for bus [' + bus + ']')
				return Promise.resolve(response)
			}

			if (bus == 'metrics')
			{
				const paged_result = metrics_bus.msg_recipients(page_size, page_index)
				response.set_results(paged_result)
				this.leave_group('produce:operation[' + operation + '] for bus [' + bus + ']')
				return Promise.resolve(response)
			}

			// ERROR: BAD BUS NAME
			response.set_has_error(true)
			response.set_error('bad operands bus [' + bus + '] for operation [' + operation + ']')
			response.set_results(operands)

			this.leave_group('produce:error:operation failure [' + operation + ']:bad operands bus [' + bus + '].')
			return Promise.resolve(response)
		}
	
		if (operation == 'devapt-msg-describe')
		{
			const buses = {}

			buses['messages'] = {
				name:msg_bus.get_name(),
				type:'messages',
				engine:msg_engine.get_name(),
				channels:msg_engine.channel_list(),
				recipients:node.get_msg_bus().msg_recipients(99, 0)
			}
			
			buses['logs'] = {
				name:logs_bus.get_name(),
				type:'logs',
				engine:logs_engine.get_name(),
				channels:logs_engine.channel_list(),
				recipients:node.get_logs_bus().msg_recipients(99, 0)
			}
			
			buses['metrics'] = {
				name:metrics_bus.get_name(),
				type:'metrics',
				engine:metrics_engine.get_name(),
				channels:metrics_engine.channel_list(),
				recipients:node.get_metrics_bus().msg_recipients(99, 0)
			}

			response.set_results(buses)

			// DEBUG
			this.debug('produce:reply for service=' + this.service.get_name() + ':operation=' + operation, response.get_properties_values())
			// console.log(context + ':produce:reply for service=' + this.service.get_name() + ':operation=' + operation, response.get_properties_values())

			this.leave_group('produce:operation[' + operation + ']')
			return Promise.resolve(response)
		}


		if (operation == 'devapt-msg-send')
		{
			const response_promise = this.produce_send(arg_request)
			this.leave_group('produce:[' + operation + ']')
			return response_promise
		}


		if (operation == 'devapt-msg-subscribe')
		{
			const response_promise = this.produce_subscribe(arg_request)
			this.leave_group('produce:[' + operation + ']')
			return response_promise
		}


		if (operation == 'devapt-msg-unsubscribe')
		{
			const response_promise = this.produce_unsubscribe(arg_request)
			this.leave_group('produce:[' + operation + ']')
			return response_promise
		}


		if (operation == 'devapt-msg-subscription')
		{
			response.set_results([])

			this.leave_group('produce:[' + operation + ']')
			return Promise.resolve(response)
		}

		this.leave_group('produce:super.')
		return super.produce(arg_request)
	}


	/**
	 * Produce service datas on send request.
	 * 
	 * @param {ServiceRequest} arg_request - service request instance.
	 * 
	 * @returns {Promise} - promise of ServiceResponse instance.
	 */
	produce_send(arg_request)
	{
		this.enter_group('produce_send')

		const response = new ServiceResponse(arg_request)
		const operation = arg_request.get_operation()
		const operands = arg_request.get_operands()
		const node = this.get_runtime().get_node()

		// GET REQUEST OPERANDS
		const sender  = arg_request.get_session_uid()
		const bus     = operands.length > 0 ? operands[0] : undefined
		const channel = operands.length > 1 ? operands[1] : 'default'
		const target  = operands.length > 2 ? operands[2] : undefined
		const payload = operands.length > 3 ? operands[3] : undefined

		if (bus == 'messages')
		{
			// CHECK REQUEST OPERANDS
			if ( ! T.isString(channel) || ! T.isString(target) || ! T.isObject(payload) )
			{
				response.set_has_error(true)
				response.set_error('bad operands to send a message.')
				response.set_results(operands)

				this.leave_group('produce_send:error:operation failure [' + operation + ']:bad operands to send a message.')
				return Promise.resolve(response)
			}

			const msg = new DistributedMessage(sender, target, payload, channel)
			node.get_msg_bus().msg_post(msg)

			this.leave_group('produce_send:operation[' + operation + '] for bus=[' + bus + '], channel=[' + channel + '], target=[' + target + ']')
			return Promise.resolve(response)
		}

		if (bus == 'logs')
		{
			// CHECK REQUEST OPERANDS
			if ( ! T.isString(channel) || ! T.isString(target) || ! T.isObject(payload) )
			{
				response.set_has_error(true)
				response.set_error('bad operands to send logs.')
				response.set_results(operands)

				this.leave_group('produce_send:error:operation failure [' + operation + ']:bad operands to send logs.')
				return Promise.resolve(response)
			}

			// CHECK LOGS CONTENT
			if ( ! T.isString(payload.timestamp) || ! T.isString(payload.level) || ! T.isArray(payload.values) )
			{
				response.set_has_error(true)
				response.set_error('bad operands payload to send logs: {timestamp:"", level:"", values:[]}.')
				response.set_results(operands)

				this.leave_group('produce_send:error:operation failure [' + operation + ']:bad operands payload to send logs: {timestamp:"", level:"", values:[]}.')
				return Promise.resolve(response)
			}

			const msg = new DistributedLogs(sender, target, payload.timestamp, payload.level, payload.values)
			node.get_logs_bus().msg_post(msg)
			
			this.leave_group('produce_send:operation[' + operation + '] for bus=[logs]')
			return Promise.resolve(response)
		}

		if (bus == 'metrics')
		{
			// CHECK REQUEST OPERANDS
			if ( ! T.isString(channel) || ! T.isString(target) || ! T.isObject(payload) )
			{
				response.set_has_error(true)
				response.set_error('bad operands to send metrics.')
				response.set_results(operands)

				this.leave_group('produce_send:error:operation failure [' + operation + ']:bad operands to send metrics.')
				return Promise.resolve(response)
			}

			// CHECK METRICS CONTENT
			if ( ! T.isString(payload.type) || ! T.isArray(payload.values) )
			{
				response.set_has_error(true)
				response.set_error('bad operands payload to send metrics: {type:"", values:[]}.')
				response.set_results(operands)

				this.leave_group('produce_send:error:operation failure [' + operation + ']:bad operands payload to send metrics: {type:"", values:[]}.')
				return Promise.resolve(response)
			}

			const msg = new DistributedMetrics(sender, target, payload.type, payload.values)
			node.get_metrics_bus().msg_post(msg)
			
			this.leave_group('produce_send:operation[' + operation + '] for bus=[metrics]')
			return Promise.resolve(response)
		}

		// ERROR: BAD BUS NAME
		response.set_has_error(true)
		response.set_error('bad operands bus [' + bus + '] for operation [' + operation + ']')
		response.set_results(operands)

		this.leave_group('produce_send:error:operation failure [' + operation + ']:bad operands bus [' + bus + '].')
		return Promise.resolve(response)
	}



	/**
	 * Produce service datas on subscribe request.
	 * 
	 * @param {ServiceRequest} arg_request - service request instance.
	 * 
	 * @returns {Promise} - promise of ServiceResponse instance.
	 */
	produce_subscribe(arg_request)
	{
		this.enter_group('produce_subscribe')

		const response = new ServiceResponse(arg_request)
		const operation = arg_request.get_operation()
		const operands = arg_request.get_operands()
		const node = this.get_runtime().get_node()

		// // GET REQUEST OPERANDS
		const sender  = arg_request.get_session_uid()
		const bus     = operands.length > 0 ? operands[0] : undefined
		const channel = operands.length > 1 ? operands[1] : 'default'
		
		if (bus == 'messages')
		{
			// CHECK REQUEST OPERANDS
			if ( ! T.isString(channel) )
			{
				response.set_has_error(true)
				response.set_error('bad operands channel to subscribe on messages.')
				response.set_results(operands)

				this.leave_group('produce_subscribe:error:operation failure [' + operation + ']:bad operands channel to subscribe on messages.')
				return Promise.resolve(response)
			}

			// SUBSCRIPTION EXISTS
			if ( this.has_subscription(sender, bus, channel) )
			{
				response.set_has_error(true)
				response.set_error('messages subscription already exists for sender [' + sender + '].')
				response.set_results(operands)

				this.leave_group('produce_subscribe:error:operation failure [' + operation + ']:messages subscription already exists for sender [' + sender + '].')
				return Promise.resolve(response)
			}


			// CREATE SESSION STREAM
			const socket = arg_request.get_socket()
			this._msg_subscriptions[sender] = this.init_subscription(sender, bus, channel, socket)

			const handler = (arg_msg)=>{
				if (arg_msg.get_target() == sender)
				{
					const msg_payload = arg_msg.get_payload()
					const socket_id = msg_payload.socket_id//.split('#')[1]
					
					// SERVICE RESPONSE TRANSPORT
					if ( T.isNotEmptyString(msg_payload.service) && T.isNotEmptyString(msg_payload.operation) && T.isArray(msg_payload.results) )
					{
						// DEBUG
						// console.log(context + ':produce_subscribe:handler for service response:socket_id=[%s] service=[%s] operation=[%s] results=[%a]', socket_id, msg_payload.service, msg_payload.operation, msg_payload.results)
						
						const iosrvs = this.get_runtime().socketio_servers
						const svc_path = '/' + msg_payload.service

						_.forEach(iosrvs,
							(iosrv)=>{
								// DEBUG
								// console.log('svc_path=[%s],socket_id=[%s], iosrv.of(svc_path)=', svc_path, socket_id, iosrv.of(svc_path))
								
								if (svc_path in iosrv.nsps)
								{
									// DEBUG
									// console.log('iosrv.of(...).connected', iosrv.of(svc_path).connected)

									if (socket_id in iosrv.of(svc_path).connected)
									{
										// DEBUG
										// console.log('iosrv.of(...).connected[socket_id]', iosrv.of(svc_path).connected[socket_id])

										iosrv.of(svc_path).connected[socket_id].emit(msg_payload.operation, msg_payload)
									}
								}
							}
						)

						return
					}

					// OTHERS CASES
					const payload = { service:this.service.get_name(), operation:'devapt-msg-subscription', results:['done', arg_msg] }
					
					// DEBUG
					// console.log(context + ':produce_subscribe:default handler:socket_id=[%s] service=[%s] operation=[%s] results=[%a]', socket_id, payload.service, payload.operation, payload.results)

					socket.emit('devapt-msg-subscription', payload)
				}
			}

			node.get_msg_bus().msg_add_recipient(sender, 'browser')
			
			this._msg_subscriptions[sender][bus][channel].unsubscribe = node.get_msg_bus().msg_subscribe(channel, handler, sender)
			
			// UNSUBSCRIBE ON SOCKET CLOSE
			socket.on('disconnect', ()=>{
				if ( this.has_subscription(sender, bus, channel) && this._msg_subscriptions[sender][bus][channel].unsubscribe )
				{
					this._msg_subscriptions[sender][bus][channel].unsubscribe()
				}
				node.get_msg_bus().msg_remove_recipient(sender)
			})
			socket.on('end', ()=>{
				if ( this.has_subscription(sender, bus, channel) && this._msg_subscriptions[sender][bus][channel].unsubscribe )
				{
					this._msg_subscriptions[sender][bus][channel].unsubscribe()
				}
				node.get_msg_bus().msg_remove_recipient(sender)
			})

			response.set_results([])

			this.leave_group('produce_subscribe:operation[' + operation + '] for bus=[messages] for sender [' + sender + ']')
			return Promise.resolve(response)
		}

		// ERROR: BAD BUS NAME
		response.set_has_error(true)
		response.set_error('bad operands bus [' + bus + '] for operation [' + operation + ']')
		response.set_results(operands)

		this.leave_group('produce_subscribe:error:operation failure [' + operation + ']:bad operands bus [' + bus + '].')
		return Promise.resolve(response)
	}



	/**
	 * Test if a message subscription exists.
	 * 
	 * @param {string} arg_sender  - message sender name.
	 * @param {string} arg_bus     - message bus name.
	 * @param {string} arg_channel - message channel name.
	 * 
	 * @returns {boolean}
	 */
	has_subscription(arg_sender, arg_bus, arg_channel)
	{
		if (arg_sender in this._msg_subscriptions)
		{
			const subscription = this._msg_subscriptions[arg_sender]
			if (arg_bus in subscription)
			{
				if (arg_channel in subscription[arg_bus])
				{
					return true
				}
			}
		}

		return false
	}
	
	
	
	/**
	 * Test if a message subscription exists.
	 * 
	 * @param {string} arg_sender  - message sender name.
	 * @param {string} arg_bus     - message bus name.
	 * @param {string} arg_channel - message channel name.
	 * @param {Socket} arg_socket  - subscription socket.
	 * 
	 * @returns {boolean}
	 */
	init_subscription(arg_sender, arg_bus, arg_channel, arg_socket)
	{
		let subscription = this._msg_subscriptions[arg_sender]
		if ( ! subscription)
		{
			subscription = {}
		}

		if ( !(arg_bus in subscription) )
		{
			subscription[arg_bus] = {}
		}

		if ( ! (arg_channel in subscription[arg_bus]) )
		{
			subscription[arg_bus][arg_channel] = {
				socket:arg_socket,
				unsubscribe: undefined
			}
		}

		return subscription
	}
	
	
	
	/**
	 * Produce service datas on unsubscribe request.
	 * 
	 * @param {ServiceRequest} arg_request - service request instance.
	 * 
	 * @returns {Promise} - promise of ServiceResponse instance.
	 */
	produce_unsubscribe(arg_request)
	{
		this.enter_group('produce_unsubscribe')

		const response = new ServiceResponse(arg_request)
		const operation = arg_request.get_operation()
		const operands = arg_request.get_operands()

		// // GET REQUEST OPERANDS
		const sender  = arg_request.get_session_uid()
		const bus     = operands.length > 0 ? operands[0] : undefined
		const channel = operands.length > 1 ? operands[1] : 'default'
		
		if (bus == 'messages')
		{
			// CHECK REQUEST OPERANDS
			if ( ! T.isString(channel) )
			{
				response.set_has_error(true)
				response.set_error('bad operands channel to unsubscribe for messages.')
				response.set_results(operands)

				this.leave_group('produce_unsubscribe:error:operation failure [' + operation + ']:bad operands channel to unsubscribe for messages.')
				return Promise.resolve(response)
			}

			// CHECK SUBSCRIPTION
			if ( ! this.has_subscription(sender, bus, channel) )
			{
				response.set_has_error(true)
				response.set_error('messages subscription doesn t exists for sender [' + sender + '].')
				response.set_results(operands)

				this.leave_group('produce_unsubscribe:error:operation failure [' + operation + ']:messages subscription doesn t exists for sender [' + sender + '].')
				return Promise.resolve(response)
			}
			const subscription = this._msg_subscriptions[sender][bus][channel]

			// REMOVE SESSION STREAM
			const unsubscribe_fn = subscription.unsubscribe
			if (unsubscribe_fn)
			{
				unsubscribe_fn()
			}
			delete this._msg_subscriptions[sender][bus][channel]

			this.leave_group('produce_unsubscribe:operation[' + operation + '] for bus=[messages] for sender [' + sender + ']')
			return Promise.resolve(response)
		}

		// ERROR: BAD BUS NAME
		response.set_has_error(true)
		response.set_error('bad operands bus [' + bus + '] for operation [' + operation + ']')
		response.set_results(operands)

		this.leave_group('produce_unsubscribe:error:operation failure [' + operation + ']:bad operands bus [' + bus + '].')
		return Promise.resolve(response)
	}
}