js/messaging/socketio_bus_engine.js
// NPM IMPORTS
import assert from 'assert'
import _ from 'lodash'
import {format} from 'util'
import SocketIOServer from 'socket.io'
import SocketIOClient from 'socket.io-client'
// COMMON IMPORTS
import T from '../utils/types'
import BusEngine from './bus_engine'
const context = 'common/messaging/socketio_bus_engine'
/**
* SocketIO based bus engine class for message bus client or server.
*
* @author Luc BORIES
* @license Apache-2.0
*
* @example
* API:
* ->constructor(arg_name,arg_settings,arg_log_context,arg_logger_manager).
*
* ->channel_list():array - List engine channels.
* ->channel_add(arg_channel):nothing - Add a channel.
* ->channel_send(arg_channel, arg_payload):nothing - Send a message into a channel.
* ->channel_on(arg_channel, arg_handler):nothing - Subscribe on channel inputs.
* ->channel_transform(arg_in_channel, arg_out_channel, arg_xform_handler):nothing - Transform payload of input channel to output channel.
*
*/
export default class SocketIOBusEngine extends BusEngine
{
/**
* Create a bus.
*
* @param {string} arg_name - instance name.
* @param {object} arg_settings - settings.
* @param {string} arg_log_context - trace context.
* @param {LoggerManager} arg_logger_manager - logger manager object (optional).
*
* @returns {nothing}
*/
constructor(arg_name, arg_settings, arg_log_context=context, arg_logger_manager=undefined)
{
super(arg_name, arg_settings, arg_log_context, arg_logger_manager)
/**
* Class type flag.
* @type {boolean}
*/
this.is_socketio_bus_engine = true
this._server = undefined
if (this._engine_type == 'Server')
{
// DEBUG
this.debug( format('constructor:create a bus server on port=[%s]', this._engine_port) )
// console.log(context + ':constructor:create a bus server on port=[%s]', this._engine_port)
this._server = new SocketIOServer(this._engine_port)
this._server.serveClient(false)
}
this._channels = {}
this._subscribers = {}
}
/**
* List engine channels.
*
* @returns {array}
*/
channel_list()
{
return Object.keys(this._channels)
}
/**
* Add a channel.
*
* @param {string} arg_channel - channel name.
*
* @returns {nothing}
*/
channel_add(arg_channel)
{
assert( T.isString(arg_channel), this.get_context() + ':channel_add:bad channel name')
// SERVER
if (this._engine_type == 'Server')
{
// INIT SERVER NSP
this._channels[arg_channel] = this._server.of('/' + arg_channel)
this._channels[arg_channel].on('connect',
(socket)=>{
// DEBUG
this.debug( format('channel_add:on connect:engine=[%s] channel=[%s] id=[%s]', this.get_name(), arg_channel, socket.id) )
// console.log(context + ':channel_add:on connect:engine=[%s] channel=[%s] id=[%s]', this.get_name(), arg_channel, socket.id)
this.on_connect(socket, arg_channel)
socket.on('error',
(value)=>
{
// DEBUG
this.debug( format('channel_add:on error:engine=[%s] channel=[%s] id=[%s] value=', this.get_name(), arg_channel, socket.id, value) )
// console.log(context + ':channel_add:on error:engine=[%s] channel=[%s] id=[%s] value=', this.get_name(), arg_channel, socket.id, value)
this.on_error(socket, arg_channel, value)
}
)
socket.on('bus',
(value)=>{
// DEBUG
this.debug( format('channel_add:on bus:engine=[%s] channel=[%s] id=[%s] value=', this.get_name(), arg_channel, socket.id, value) )
// console.log(context + ':channel_add:on bus:engine=[%s] channel=[%s] id=[%s] value=', this.get_name(), arg_channel, socket.id, value)
this.on_bus(socket, arg_channel, value)
}
)
}
)
this._channels[arg_channel].on('disconnect',
(socket)=>{
// DEBUG
this.debug( format('channel_add:on disconnect:engine=[%s] channel=[%s] id=[%s]', this.get_name(), arg_channel, socket.id) )
// console.log(context + ':channel_add:on disconnect:engine=[%s] channel=[%s] id=[%s]', this.get_name(), arg_channel, socket.id)
this.on_disconnect(socket, arg_channel)
}
)
return
}
// CLIENT
this._channels[arg_channel] = SocketIOClient(this._engine_url + '/' + arg_channel)
// CLIENT DEBUG
// console.log('this._channels[%s]', arg_channel, this._channels[arg_channel])
this._channels[arg_channel].on('bus',
(value)=>{
// DEBUG
this.debug( format('channel_add:on bus:engine=[%s] channel=[%s] id=[%s]', this.get_name(), arg_channel, this._channels[arg_channel].id) )
// console.log(context + ':channel_add:on bus:engine=[%s] channel=[%s] id=[%s]', this.get_name(), arg_channel, this._channels[arg_channel].id)
this.on_bus(this._channels[arg_channel], arg_channel, value)
}
)
}
/**
* Send a message into a channel.
*
* @param {string} arg_channel - channel name string.
* @param {object} arg_payload - payload data object.
*
* @returns {nothing}
*/
channel_send(arg_channel, arg_payload)
{
assert( T.isString(arg_channel), this.get_context() + ':channel_send:bad channel name')
assert( arg_payload, this.get_context() + ':channel_send:bad payload data')
assert( arg_channel in this._channels, this.get_context() + ':channel_send:channel [' + arg_channel + '] stream not found')
this._channels[arg_channel].emit('bus', arg_payload)
}
/**
* Subscribe on channel inputs.
*
* @param {string} arg_channel - channel name string.
* @param {function} arg_handler - f(payload):nothing
* @param {function} arg_predicate - p(payload):boolean
*
* @returns {nothing}
*/
channel_on(arg_channel, arg_handler, arg_predicate=undefined)
{
assert( T.isString(arg_channel), this.get_context() + ':channel_on:bad channel name on engine [' + this.get_name() + ']')
assert( T.isFunction(arg_handler), this.get_context() + ':channel_on:bad handler function on engine [' + this.get_name() + '] for channel [' + arg_channel + ']')
assert( arg_channel in this._channels, this.get_context() + ':channel_on:channel [' + arg_channel + '] stream not found on engine [' + this.get_name() + ']')
// DEBUG
this.debug( format('channel_on:bus[' + this.get_name() + '] channel [' + arg_channel + '] predicate=[%s]', arg_predicate ? arg_predicate.toString() : '') )
// console.log(context + ':channel_on:bus[' + this.get_name() + '] channel [' + arg_channel + ']')
const handler = (value) => {
// DEBUG
this.debug( format('channel_on:bus=[' + this.get_name() + '] channel=[' + arg_channel + '] received value=', value) )
// console.log(context + ':channel_on:bus=[' + this.get_name() + '] channel=[' + arg_channel + '] received value=', value)
// FILTER BY PREDICATE
if ( T.isFunction(arg_predicate) )
{
if ( arg_predicate(value) )
{
arg_handler(value)
return
}
}
// NO VALID PREDICATE
arg_handler(value)
}
if (! (arg_channel in this._subscribers) )
{
this._subscribers[arg_channel] = []
}
this._subscribers[arg_channel].push(handler)
}
on_connect(arg_socket, arg_channel)
{
//
}
on_disconnect(arg_socket, arg_channel)
{
//
}
on_error(arg_socket, arg_channel, arg_value)
{
//
}
on_bus(arg_socket, arg_channel, arg_value)
{
_.forEach(
this._subscribers[arg_channel],
(arg_handler)=>{
arg_handler(arg_value)
}
)
}
}