Reference Source

js/messaging/streams_provider.js

// NPM IMPORTS
// import assert from 'assert'
import _ from 'lodash'

// COMMON IMPORTS
import T               from '../utils/types'
import Instance        from '../base/instance'
import Stream          from '../messaging/stream'


const context = 'common/messaging/streams_provider'



/**
 * Service provider base class.
 * @abstract
 * 
 * @author Luc BORIES
 * @license Apache-2.0
 * 
 * @example
 * 	API:
 * 		->add_stream(arg_stream_name, arg_stream=undefined):nothing - register a new stream.
 * 		->remove_stream(arg_stream_name):nothing - unregister a stream.
 * 		->get_stream(arg_stream_name):Stream - get a registered stream.
 * 		->subscribe(arg_stream_name, arg_subscriber)
 * 		->unsubscribe(arg_stream_name, arg_subscriber)
 * 
 */
export default class SteamsProvider extends Instance
{
	/**
	 * Create a streams provider.
	 * 
	 * @param {string} arg_collection - collection name.
	 * @param {string} arg_class - class name.
	 * @param {string} arg_name - instance name.
	 * @param {Immutable.Map|object} arg_settings - settings plain object
	 * @param {string} arg_log_context - log context.
	 * @param {LoggerManager} arg_logger_manager - logger manager object (optional).
	 * 
	 * @returns {nothing}
	 */
	constructor(arg_collection, arg_class, arg_name, arg_settings, arg_log_context, arg_logger_manager)
	{
		super(arg_collection, arg_class, arg_name, arg_settings, arg_log_context, arg_logger_manager)
		
		this.is_streams_provider = true

		this._streams = {}
	}
	
	

	/**
	 * Register a new stream.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * @param {Stream} arg_stream - stream instance (optional, default undefined).
	 * 
	 * @returns {nothing}
	 */
	add_stream(arg_stream_name, arg_stream=undefined)
	{
		if (arg_stream_name in this._streams)
		{
			return
		}
		
		// INIT REGISTERING RECORD
		this._streams[arg_stream_name] = {
			subscribers:[],
			stream:undefined,
			own_stream:true,
			unsubscribe:undefined
		}

		// CREATE STREAM IF NEEDED
		if ( ! T.isObject(arg_stream) || ! arg_stream.is_stream )
		{
			arg_stream = new Stream()
			this._streams[arg_stream_name].own_stream = true
		}
		this._streams[arg_stream_name].stream = arg_stream

		// HANDLE POST
		const post_cb = (v) => {
			// console.log(context + ':on stream value for provider %s',  arg_provider_name)
			this.post_to_subscribers(arg_stream_name, v)
		}
		this._streams[arg_stream_name].unsubscribe = this._streams[arg_stream_name].stream.subscribe(post_cb)
	}
	
	

	/**
	 * Unregister a stream.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * 
	 * @returns {boolean}
	 */
	remove_stream(arg_stream_name)
	{
		if (arg_stream_name in this._streams)
		{
			const record = this._streams[arg_stream_name]

			record.unsubscribe()

			_.forEach(record.subscribers,
				(subscriber)=>{
					this.unsubscribe(arg_stream_name, subscriber)
				}
			)

			if (record.own_stream)
			{
				delete record.stream
			}

			delete this._streams[arg_stream_name]
			return true
		}
		return false
	}
	
	

	/**
	 * Get a stream.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * 
	 * @returns {Stream}
	 */
	get_stream(arg_stream_name)
	{
		if (arg_stream_name in this._streams)
		{
			const record = this._streams[arg_stream_name]
			return record.stream
		}
		return undefined
	}
	
	

	/**
	 * Test if a stream is registered.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * 
	 * @returns {boolean}
	 */
	has_stream(arg_stream_name)
	{
		return (arg_stream_name in this._streams)
	}
	
	
	
	/**
	 * Add a subscriber socket.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * @param {object} arg_subscriber  - subscriber object.
	 * 
	 * @returns {boolean}
	 */
	subscribe(arg_stream_name, arg_subscriber)
	{
		if (arg_stream_name in this._streams)
		{
			const record = this._streams[arg_stream_name]
			record.subscribers.push(arg_subscriber)
			return true
		}

		return false
	}
	
	
	
	/**
	 * Remove a subscriber socket.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * @param {object} arg_subscriber  - subscriber object.
	 * @param {function} arg_subscriber_equal_fn - test function (default is (a,b)=>(a==b) ).
	 * 
	 * @returns {nothing}
	 */
	unsubscribe(arg_stream_name, arg_subscriber, arg_subscriber_equal_fn=(a,b)=>a==b)
	{
		if (arg_stream_name in this._streams)
		{
			const record = this._streams[arg_stream_name]
			
			_.remove(record.subscribers,
				(subscriber)=>{
					return arg_subscriber_equal_fn(subscriber, arg_subscriber)
				}
			)

			return true
		}

		return false
	}
	
	
	
	/**
	 * Post streams values to subscribers.
	 * 
	 * @param {string} arg_stream_name - stream name.
	 * @param {object} arg_datas - response values.
	 * 
	 * @returns {nothing}
	 */
	post_to_subscribers(arg_stream_name, arg_datas)
	{
		if (arg_stream_name in this._streams)
		{
			const record = this._streams[arg_stream_name]
			
			_.forEach(record.subscribers,
				(subscriber) => {
					this.post_to_subscriber(subscriber, arg_stream_name, arg_datas)
				}
			)
		}
	}
	
	
	
	/**
	 * Post streams values to one subscriber.
	 * @abstract
	 * 
	 * @param {object} arg_subscriber - subscriber object.
	 * @param {string} arg_stream_name - stream name.
	 * @param {object} arg_datas - response values.
	 * 
	 * @returns {nothing}
	 */
	post_to_subscriber(arg_subscriber, arg_stream_name, arg_datas)
	{
		console.log(context + ':post_to_subscriber:stream=[%s] subscriber=', arg_stream_name, arg_subscriber)
		console.log(context + ':post_to_subscriber:stream=[%s] datas=', arg_stream_name, arg_datas)
	}
}