js/base/distributed_instance.js
// NPM IMPORTS
import assert from 'assert'
// COMMON IMPORTS
import T from '../utils/types'
import Instance from '../base/instance'
import DistributedMessage from '../base/distributed_message'
import DistributedMetrics from '../base/distributed_metrics'
import DistributedLogs from '../base/distributed_logs'
/**
* Contextual constant for this file logs.
* @private
*/
const context = 'common/base/distributed_instance'
/**
* Distributed instances base class: enable communication inside a node or between nodes.
*
* @author Luc BORIES
* @license Apache-2.0
*
* @example
* API:
* ->load():nothing
* ->load_topology_settings(arg_settings):nothing
*
* ->send(DistributedMessage|DistributedMetrics|DistributedLogs):boolean
*
* ->enable_on_bus(arg_bus):nothing
* ->disable_on_bus(arg_bus):nothing
*
* API for messages:
* ->send_msg(target, payload):boolean
* ->receive_msg(DistributedMessage):nothing
* ->enable_msg():nothing
* ->disable_msg():nothing
*
* API for metrics:
* ->send_metrics(type, values):boolean
* ->receive_metrics(DistributedMetrics):nothing
* ->enable_metrics():nothing
* ->disable_metrics():nothing
*
* API for logs:
* ->send_logs(ts, level, texts):boolean
* ->receive_logs(DistributedLogs):nothing
* ->enable_logs():nothing
* ->disable_logs():nothing
*
*/
export default class DistributedInstance extends Instance
{
/**
* Create a DistributedInstance.
*
* @param {string} arg_collection - collection name.
* @param {string} arg_name - server name
* @param {string} arg_class - server class name
* @param {object} arg_settings - plugin settings map
* @param {string} arg_log_context - trace context string (optional, default=context).
*
* @returns {nothing}
*/
constructor(arg_collection, arg_name, arg_class, arg_settings, arg_log_context=context)
{
assert( T.isObject(arg_settings.runtime) || (arg_settings.has && arg_settings.has('runtime')), arg_log_context + ':bad runtime instance')
assert( T.isObject(arg_settings.logger_manager) || (arg_settings.has && arg_settings.has('logger_manager') ), arg_log_context + ':bad logger_manager instance')
super(arg_collection, arg_class, arg_name, arg_settings, arg_log_context, arg_settings.logger_manager)
/**
* Class type flag.
* @type {boolean}
*/
this.is_distributed_instance = true
/**
* Messages bus instance.
* @type {MessageBus}
*/
this._msg_bus = undefined
/**
* Metrics messages bus instance.
* @type {MessageBus}
*/
this._metrics_bus = undefined
/**
* Logs messages bus instance.
* @type {MessageBus}
*/
this._logs_bus = undefined
/**
* Bus unsubscribes handlers map.
* @type {object}
*/
this._bus_unsubscribes = {}
// DEBUG
// this.enable_trace()
}
/**
* Load instance settings.
*
* @returns {nothing}
*/
load()
{
// console.log(context + ':load:DistributedInstance')
super.load()
// REGISTER BUSES
if (! this.is_client_runtime)
{
this._msg_bus = this.get_runtime().node.get_msg_bus()
this._metrics_bus = this.get_runtime().node.get_metrics_bus()
this._logs_bus = this.get_runtime().node.get_logs_bus()
}
// DEBUG
// console.log(context + ':load:name=%s this._metrics_bus', this.get_name(), this._metrics_bus.get_name())
}
/**
* Load topology settings.
*
* @param {object} arg_settings - master node settings.
*
* @returns {nothing}
*/
load_topology_settings(arg_settings)
{
this.enter_group('load_topology_settings')
arg_settings = undefined
this.leave_group('load_topology_settings')
return arg_settings
}
/**
* Send a message to an other client.
*
* @param {DistributedMessage} arg_msg - message object: a DistributedMessage or DistributedMetrics or DistributedLogs instance.
*
* @returns {boolean} - message send or not
*/
send(arg_msg)
{
assert( T.isObject(arg_msg), context + ':send:bad message object')
if (this._msg_bus && arg_msg.is_distributed_message)
{
this._msg_bus.msg_post(arg_msg)
return true
}
if (this._metrics_bus && arg_msg.is_distributed_metrics)
{
this._metrics_bus.msg_post(arg_msg)
return true
}
if (this._logs_bus && arg_msg.is_distributed_logs)
{
this._logs_bus.msg_post(arg_msg)
return true
}
assert(false, context + ':send:bad message type: not msg or metrics or logs')
return false
}
/**
* Enable distributed messaging.
*
* @param {MessageBus} arg_bus - message bus.
* @param {string} arg_channel - channel name string (default='default').
* @param {string} arg_method - receiveing method name string (default='receive_msg').
* @param {string} arg_alias - instance alias name string (optional, default undefined).
*
* @returns {nothing}
*/
enable_on_bus(arg_bus, arg_channel='default', arg_method='receive_msg', arg_alias=undefined)
{
const bus_name = arg_bus.get_name() + ':' + arg_channel
this._bus_unsubscribes[bus_name] = arg_bus.msg_register(this, arg_channel, arg_method, arg_alias)
}
/**
* Disable distributed messaging.
*
* @param {MessageBus} arg_bus - message bus.
* @param {string} arg_channel - bus channel string (default='default').
*
* @returns {nothing}
*/
disable_on_bus(arg_bus, arg_channel='default')
{
const bus_name = arg_bus.get_name() + ':' + arg_channel
const unsubscribe = this._bus_unsubscribes[bus_name]
if ( T.isFunction(unsubscribe) )
{
unsubscribe()
}
}
// -------------------------------- MESSAGES -------------------------------------
/**
* Create and send a message to an other client.
*
* @param {string} arg_target_name - recipient name.
* @param {object} arg_payload - message payload plain object.
* @param {string} arg_channel - channel name string (default='default').
*
* @returns {boolean} - message send or not.
*/
send_msg(arg_target_name, arg_payload, arg_channel)
{
// DEBUG
// this.enable_trace()
this.enter_group('send_msg')
let msg = new DistributedMessage(this.get_name(), arg_target_name, arg_payload, arg_channel)
if (this._msg_bus && msg.check_msg_format(msg) )
{
this._msg_bus.msg_post(msg)
this.leave_group('send_msg')
return true
}
this.leave_group('send_msg:bad format')
return false
}
/**
* Process received message (to override in sub classes).
*
* @param {DistributedMessage} arg_msg - message instance.
*
* @returns {nothing}
*/
receive_msg(arg_msg)
{
this.enter_group('receive_msg')
// console.log(context + ':receive_msg:from=%s', arg_msg.sender, arg_msg.payload)
// DO NOT PROCESS MESSAGES FROM SELF
if (arg_msg.sender == this.get_name())
{
this.leave_group('receive_msg:ignore message from itself')
return
}
this.leave_group('receive_msg')
}
/**
* Enable distributed messaging.
*
* @returns {nothing}
*/
enable_msg()
{
this.enable_on_bus(this._msg_bus, 'default', 'receive_msg')
}
/**
* Disable distributed messaging.
*
* @returns {nothing}
*/
disable_msg()
{
this.disable_on_bus(this._msg_bus, 'default')
}
// -------------------------------- METRICS -------------------------------------
/**
* Send a metrics message.
*
* @param {string} arg_target_name - recipient name.
* @param {string} arg_metric_type - type of metrics.
* @param {array} arg_metrics - metrics values array.
*
* @returns {boolean} - message send or not
*/
send_metrics(arg_target_name, arg_metric_type, arg_metrics)
{
this.enter_group('send_metrics')
assert( T.isObject(this._metrics_bus), context + ':send_metrics:bad metrics bus object')
// console.log(context + ':send_metrics:from=%s, to=%s, type=%s', this.get_name(), arg_target_name, arg_metric_type)
let msg = new DistributedMetrics(this.get_name(), arg_target_name, arg_metric_type, arg_metrics)
if (this._metrics_bus && msg.check_msg_format(msg) )
{
this._metrics_bus.msg_post(msg)
// console.log(context + ':send_metrics:from=%s, to=%s, type=%s', this.get_name(), arg_target_name, arg_metric_type)
this.leave_group('send_metrics')
return true
}
console.error(context + ':send_metrics:BAD FORMAT:from=%s, to=%s, type=%s, values=', this.get_name(), arg_target_name, arg_metric_type, arg_metrics)
this.leave_group('send_metrics:bad format')
return false
}
/**
* Process received metrics message (to override in sub classes).
*
* @param {DistributedMetrics} arg_msg - metrics message instance.
*
* @returns {nothing}
*/
receive_metrics(arg_msg)
{
this.enter_group('receive_metrics')
// console.log(context + ':receive_metrics:from=%s', arg_msg.sender, arg_msg.payload)
// DO NOT PROCESS MESSAGES FROM SELF
if (arg_msg.sender == this.get_name())
{
this.leave_group('receive_metrics:ignore message from itself')
return
}
this.leave_group('receive_metrics')
}
/**
* Enable distributed metrics.
*
* @returns {nothing}
*/
enable_metrics()
{
this.enable_on_bus(this._metrics_bus, 'metrics', 'receive_metrics')
}
/**
* Disable distributed metrics.
*
* @returns {nothing}
*/
disable_metrics()
{
this.disable_on_bus(this._metrics_bus, 'metrics')
}
// -------------------------------- LOGS -------------------------------------
/**
* Send a logs message.
*
* @param {string} arg_target_name - recipient name.
* @param {string} arg_timestamp - logs timestamp string.
* @param {string} arg_level - logs level string.
* @param {array} arg_values - logs values array.
*
* @returns {boolean} - message send or not
*/
send_logs(arg_target_name, arg_timestamp, arg_level, arg_values)
{
this.enter_group('send_logs')
let msg = new DistributedLogs(this.get_name(), arg_target_name, arg_timestamp, arg_level, arg_values)
if (this._logs_bus && msg.check_msg_format(msg) )
{
this._logs_bus.msg_post(msg)
this.leave_group('send_logs')
return true
}
this.leave_group('send_logs:bad format')
return false
}
/**
* Process received logs message (to override in sub classes).
*
* @param {DistributedLogs} arg_msg - logs message instance.
*
* @returns {nothing}
*/
receive_logs(arg_msg)
{
this.enter_group('receive_logs')
// console.log(context + ':receive_logs:from=%s', arg_msg.sender, arg_msg.payload)
// DO NOT PROCESS MESSAGES FROM SELF
if (arg_msg.sender == this.get_name())
{
this.leave_group('receive_logs:ignore message from itself')
return
}
this.leave_group('receive_logs')
}
/**
* Enable distributed logs.
*
* @returns {nothing}
*/
enable_logs()
{
this.enable_on_bus(this._logs_bus, 'logs', 'receive_logs')
}
/**
* Disable distributed logs.
*
* @returns {nothing}
*/
disable_logs()
{
this.disable_on_bus(this._logs_bus, 'logs')
}
}