Reference Source

js/services/metrics/metrics_svc_provider.js

// NPM IMPORTS
import {format} from 'util'

// COMMON IMPORTS
import T                  from 'devapt-core-common/dist/js/utils/types'
import ServiceProvider    from 'devapt-core-common/dist/js/services/service_provider'
import ServiceResponse    from 'devapt-core-common/dist/js/services/service_response'
import {get_runtime}      from 'devapt-core-common/dist/js/base/runtime'

// SERVICES IMPORTS


/**
 * Runtime instance.
 * @private
 * @type {RuntimeBase}
 */
const runtime = get_runtime()

/**
 * Contextual constant for this file logs.
 * @private
 */
const context = 'services/metrics/metrics_svc_provider'



/**
 * Metrics service provider class.
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 * 
 */
export default class MetricsSvcProvider extends ServiceProvider
{
	/**
	 * Create a metrics service provider.
	 * 
	 * @param {string} arg_provider_name - consumer name.
	 * @param {Service} arg_service_instance - service instance.
	 * @param {string} arg_context - logging context label.
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_provider_name, arg_service_instance, arg_context=context)
	{
		super(arg_provider_name, arg_service_instance, arg_context)

		/**
		 * Class test flag.
		 * @type {boolean}
		 */
		this.is_metrics_svc_provider = true

		// CREATE STREAMS
		// this.metrics_streams = {
		// 	'http':new Stream(),
		// 	'bus':new Stream(),
		// 	'host':new Stream(),
		// 	'nodejs':new Stream()
		// }

		// GET NODE METRICS BUS
		/**
		 * Metrics bus stream.
		 * @private
		 * @type {Stream}
		 */
		this._metrics_bus_stream = runtime.node.get_metrics_bus().get_input_stream()
		
		/**
		 * Metrics bus transformed stream.
		 * @private
		 * @type {Stream}
		 */
		this._metrics_bus_stream_transformed = undefined
		
		// DEBUG STREAM
		// this._metrics_bus_streams.subscribe(
		// 	(metrics_record) => {
		// 		console.log('MetricsSvcProvider: new metrics record on the bus', metrics_record)
		// 	}
		// )


		this.add_stream('http')
		this.add_stream('bus')
		this.add_stream('host')
		this.add_stream('nodejs')

		this.init_stream('http')
		this.init_stream('bus')
		this.init_stream('host')
		this.init_stream('nodejs')
	}



	/**
	 * Get provider operations names.
	 * @abstract
	 * 
	 * @returns {array}
	 */
	get_operations_names()
	{
		return [
			'devapt-metrics-get',        'devapt-metrics-list',
			'devapt-metrics-http-get',   'devapt-metrics-http-list',   'devapt-metrics-http-subscribe',
			'devapt-metrics-bus-get',    'devapt-metrics-bus-list',    'devapt-metrics-bus-subscribe',
			'devapt-metrics-host-get',   'devapt-metrics-host-list',   'devapt-metrics-host-subscribe',
			'devapt-metrics-nodejs-get', 'devapt-metrics-nodejs-list', 'devapt-metrics-nodejs-subscribe'
		].concat( super.get_operations_names() )
	}


	
	/**
	 * Produce service datas on request.
	 * 
	 * @param {ServiceRequest} arg_request - service request instance.
	 * 
	 * @returns {Promise} - promise of ServiceResponse instance.
	 */
	produce(arg_request)
	{
		this.enter_group('produce')

		if ( ! T.isObject(arg_request) || ! arg_request.is_service_request)
		{
			this.leave_group('produce:error:bad request object.')
			return Promise.resolve({error:'bad request object'})
		}

		const response = new ServiceResponse(arg_request)
		const operation = arg_request.get_operation()
		const operands = arg_request.get_operands()
		let type = undefined
		let op = 'undefined'

		switch(operation) {
			case 'devapt-subscribe':
				op = 'super'
				break
			case 'devapt-unsubscribe':
				op = 'super'
				break

			case 'devapt-metrics-get':
				op = 'get'
				break
			case 'devapt-metrics-list':
				op = 'list'
				break
			
			case 'devapt-metrics-http-get':
				type = 'http'
				op = 'get'
				break
			case 'devapt-metrics-http-list':
				type = 'http'
				op = 'list'
				break
			case 'devapt-metrics-http-subscribe':
				type = 'http'
				op = 'subscribe'
				break
			case 'devapt-metrics-http-unsubscribe':
				type = 'http'
				op = 'unsubscribe'
				break
			
			case 'devapt-metrics-bus-get':
				type = 'bus'
				op = 'get'
				break
			case 'devapt-metrics-bus-list':
				type = 'bus'
				op = 'list'
				break
			case 'devapt-metrics-bus-subscribe':
				type = 'bus'
				op = 'subscribe'
				break
			case 'devapt-metrics-bus-unsubscribe':
				type = 'bus'
				op = 'unsubscribe'
				break
			
			case 'devapt-metrics-host-get':
				type = 'host'
				op = 'get'
				break
			case 'devapt-metrics-host-list':
				type = 'host'
				op = 'list'
				break
			case 'devapt-metrics-host-subscribe':
				type = 'host'
				op = 'subscribe'
				break
			case 'devapt-metrics-host-unsubscribe':
				type = 'host'
				op = 'unsubscribe'
				break
			
			case 'devapt-metrics-nodejs-get':
				type = 'nodejs'
				op = 'get'
				break
			case 'devapt-metrics-nodejs-list':
				type = 'nodejs'
				op = 'list'
				break
			case 'devapt-metrics-nodejs-subscribe':
				type = 'nodejs'
				op = 'subscribe'
				break
			case 'devapt-metrics-nodejs-unsubscribe':
				type = 'nodejs'
				op = 'unsubscribe'
				break
			
		}

		let operand_index = 0
		if (! type && operands.length > 0 && T.isNotEmptyString(operands[0]) )
		{
			type = operands[0]
			++operand_index
		}
		
		// DEBUG
		const debug_msg = format('produce:service[%s] operation[%s] op[%s] type[%s] operands count[%d] opds index[%d]', response.get_service(), operation, op, type, operands.length, operand_index)
		// console.log(context + debug_msg)
		this.debug(debug_msg)

		// CHECK OPERATION AND TYPE STRINGS
		if (! T.isNotEmptyString(op) || ! T.isNotEmptyString(type) )
		{
			this.fill_error(response, operands, 'bad metrics operation and/or type string', op, type)
			return Promise.resolve(response)
		}

		// GET METRICS SERVER
		const metrics_server = runtime.node.get_metrics_server()
		if (! metrics_server)
		{
			this.fill_error(response, operands, 'metrics server not found', op, type)
			return Promise.resolve(response)
		}

		// OPERATION: GET CURRENT METRICS
		if (op == 'get')
		{
			// GET WITH ONE OPERAND
			const item = operands[operand_index]
			if ( T.isNotEmptyString(item) )
			{
				this.debug('get with one not empty operand string:[' + item + ']')
				const state_values = metrics_server.get_metrics_state_values(type, item)
				
				// console.log(context + debug_msg + ' item=[%s] state_values=', item, bus_state_values)
				
				response.set_results([state_values])

				this.leave_group( debug_msg + format(' metrics values keys[%s]', Object.keys(state_values) ) )
				return Promise.resolve(response)
			}
			
			// DEFAULT CASE
			const state_values = metrics_server.get_metrics_state_values(type)

			// console.log(context + debug_msg + ' state_values=', state_values)
			this.debug(debug_msg + ' state_values=', state_values)
			
			response.set_results([state_values])

			this.leave_group( debug_msg + format(' metrics values keys[%s]', Object.keys(state_values) ) )
			return Promise.resolve(response)
		}
		
		// OPERATION: LIST METRICS ITEMS
		if (op == 'list')
		{
			const state_items = metrics_server.get_metrics_state_values_items(type)

			// console.log(context + debug_msg + ' state_items=', state_items)
			this.debug(debug_msg + ' state_items=', state_items)
			
			const items = T.isArray(state_items) ? state_items : []
			response.set_results(items)

			this.leave_group( debug_msg + format(' metrics items[%s]', items) )
			return Promise.resolve(response)
		}
		
		// OPERATION: SUBSCRIBE
		if (op == 'subscribe')
		{
			const socket = arg_request.get_socket()
			
			if (socket)
			{
				const result = this.subscribe(type, socket)
				if (! result)
				{
					response.set_results([ { error:'subscribe failed' } ].concat(operands) )
					return Promise.resolve(response)
				}

				response.set_results( ['done'].concat(operands) )
				return Promise.resolve(response)
			}
			
			response.set_results([ { error:'bad socket' } ].concat(operands) )
			return Promise.resolve(response)
		}
		
		// OPERATION: UNSUBSCRIBE
		if (op == 'unsubscribe')
		{
			const socket = arg_request.get_socket()
			
			if (socket)
			{
				const result = this.unsubscribe(type, socket)
				if (! result)
				{
					response.set_results([ { error:'subscribe failed' } ].concat(operands) )
					return Promise.resolve(response)
				}

				response.set_results( ['done'].concat(operands) )
				return Promise.resolve(response)
			}
			
			response.set_results([ { error:'bad socket' } ].concat(operands) )
			return Promise.resolve(response)
		}

		this.leave_group('produce:super.')
		return super.produce(arg_request)
	}
	


	/**
	 * Populate a response with error message.
	 * 
	 * @param {ServiceResponse} arg_response - response instance.
	 * @param {array} arg_operands     - request operands.
	 * @param {string} arg_error       - error text.
	 * @param {string} arg_op          - metrics operation.
	 * @param {string} arg_type        - metrics type.
	 * 
	 * @returns {nothing}
	 */
	fill_error(arg_response, arg_operands, arg_error, arg_op, arg_type)
	{
		const op = arg_response.get_operation()
		const svc = arg_response.get_service()
		const error_msg = format('produce:error=[%s] with svc=[%s] operation=[%s] op=[%s] type=[%s] operands count=[%i]', arg_error, svc, op, arg_op, arg_type, arg_operands.length)
		
		arg_response.set_has_error(true)
		arg_response.set_error(error_msg)
		arg_response.set_results(arg_operands)
		
		this.leave_group(error_msg)
	}
	
	
	
	/**
	 * Init output stream.
	 * 
	 * @param {string} arg_type - metrics type.
	 * 
	 * @returns {nothing}
	 */
	init_stream(arg_type)
	{
		const self = this
		
		// CHECK TYPE
		if ( ! this.has_stream(arg_type) )
		{
			console.error(context + ':init_stream:bad metrics type[' + arg_type + ']')
			return
		}

		const max_metrics_per_msg = 10
		const delay_per_metrics_msg = 100
		const limit_cb = (grouped_stream/*, group_start_event*/) => {
			const map_cb = (values) => {
				// console.log(values, 'limit.map.values')
				
				let metrics_record = {
					metric: undefined,
					metrics:[]
				}
				
				values.forEach(
					(value) => {
						metrics_record.metric = value.metric,
						metrics_record.metrics = metrics_record.metrics.concat(value.metrics)
					}
				)
				
				// console.log(metrics_record, 'limit.map.metrics_record')
				return metrics_record
			}
			
			return grouped_stream.bufferWithTimeOrCount(delay_per_metrics_msg, max_metrics_per_msg).map(map_cb)
		}
		
		
		const key_cb = (value) => {
			// console.log(value.metric, 'value.metric')
			return value.metric
		}
		
		
		const flatmap_cb = (grouped_stream) => {
			return grouped_stream
		}
		
		const msg_filter_cb = (arg_msg) => {
			if (arg_msg.payload.metric != arg_type)
			{
				console.warn(context + ':init_stream:metrics filter blocks', arg_msg.payload.metric, arg_type)
			}
			return arg_msg.payload.metric == arg_type
		}
		
		const msg_cb = (arg_msg) => {
			const metric_type = arg_msg.payload.metric
			const metrics_array = arg_msg.payload.metrics
			const metrics_record = {
				metric: metric_type,
				metrics:metrics_array
			}
			
			return metrics_record
		}
		
		self._metrics_bus_stream_transformed = self._metrics_bus_stream.get_transformed_stream().filter(msg_filter_cb).map(msg_cb).groupBy(key_cb, limit_cb).flatMap(flatmap_cb)
		
		self._metrics_bus_stream_transformed.onValue(
			(metrics_record) => {
				console.log(context + ':init_stream:new metrics record')
				self.get_stream(arg_type).push(metrics_record)
			}
		)
	}
}