js/services/logs/logs_svc_provider.js
// NPM IMPORTS
// import assert from 'assert'
// COMMON IMPORTS
import T from 'devapt-core-common/dist/js/utils/types'
import ServiceProvider from 'devapt-core-common/dist/js/services/service_provider'
import ServiceResponse from 'devapt-core-common/dist/js/services/service_response'
// SERVICES IMPORTS
/**
* Contextual constant for this file logs.
* @private
*/
const context = 'services/logs/logs_svc_provider'
/**
* Logs service provider class.
* @author Luc BORIES
* @license Apache-2.0
*/
export default class LogsSvcProvider extends ServiceProvider
{
/**
* Create a assets service provider.
*
* @param {string} arg_provider_name - consumer name.
* @param {Service} arg_service_instance - service instance.
* @param {string} arg_context - logging context label.
*
* @returns {nothing}
*/
constructor(arg_provider_name, arg_service_instance, arg_context=context)
{
super(arg_provider_name, arg_service_instance, arg_context)
/**
* Class test flag.
* @type {boolean}
*/
this.is_logs_svc_provider = true
// GET INPUT STREAM TO FORWARD TO SUBSCRIBERS
/**
* Logs bus stream.
* @type {Stream}
*/
this.logs_bus_stream = this.get_runtime().node.get_logs_bus().get_input_stream()
this.init_logs_bus_stream()
// DEBUG
// this.logs_bus_stream.subscribe(
// (logs_record) => {
// console.log('LogsSvcProvider: new logs record on the bus', logs_record)
// this.provided_values_stream.push(logs_record)
// }
// )
}
/**
* Get provider operations names.
* @abstract
*
* @returns {array}
*/
get_operations_names()
{
return ['devapt-log'].concat(super.get_operations_names())
}
/**
* Produce service datas on request.
*
* @param {ServiceRequest} arg_request - service request instance.
*
* @returns {Promise} - promise of ServiceResponse instance.
*/
produce(arg_request)
{
const operation = arg_request.get_operation()
// const operands = arg_request.get_operands()
// // CHECK OPERANDS
// if ( ! T.isNotEmptyArray(operands) )
// {
// return Promise.resolve(undefined)
// }
const response = new ServiceResponse(arg_request)
// SUBSCRIBE TO PROVIDER STREAM DATAS
if (operation == 'devapt-log')
{
// TODO
response.set_results([ { error:'...' } ])
return Promise.resolve(response)
}
return super.produce(arg_request)
}
/**
* Initialize bus stream.
*
* @returns {nothing}
*/
init_logs_bus_stream()
{
const max_logs_per_msg = 10
const delay_per_logs_msg = 100
const self = this
const limit_cb = (grouped_stream/*, group_start_event*/) => {
const map_cb = (values) => {
// console.log(values, 'limit.map.values')
let logs_record = {
ts:undefined,
level: undefined,
source:undefined,
logs:[]
}
values.forEach(
(value) => {
logs_record.ts = value.ts,
logs_record.level = value.level,
logs_record.source = value.source,
logs_record.logs = logs_record.logs.concat(value.logs)
}
)
// console.log(logs_record, 'limit.map.logs_record')
return logs_record
}
return grouped_stream.bufferWithTimeOrCount(delay_per_logs_msg, max_logs_per_msg).map(map_cb)
}
const key_cb = (value) => {
// console.log(value.level, 'value.level')
return value.level
}
const flatmap_cb = (grouped_stream) => {
return grouped_stream
}
const msg_cb = (arg_msg) => {
let logs_ts = undefined
let logs_level = undefined
let logs_source = undefined
let logs_array = undefined
// DEBUG
// debugger
if ( T.isObject(arg_msg) && arg_msg.is_distributed_logs )
{
logs_ts = arg_msg.get_logs_ts()
logs_level = arg_msg.get_logs_level()
logs_source = arg_msg.get_logs_source()
logs_array = arg_msg.get_logs_values()
}
else if ( T.isObject(arg_msg) && T.isString(arg_msg.level) && T.isArray(arg_msg.logs) )
{
logs_ts = arg_msg.ts
logs_level = arg_msg.level
logs_source = arg_msg.source
logs_array = arg_msg.logs
}
const logs_record = {
ts: logs_ts,
level: logs_level,
source: logs_source,
logs:logs_array
}
return logs_record
}
self.logs_bus_stream_transfomed = self.logs_bus_stream.get_transformed_stream().map(msg_cb).groupBy(key_cb, limit_cb).flatMap(flatmap_cb)
self.logs_bus_stream_transfomed.onValue(
(logs_record) => {
this.get_stream('default').push(logs_record)
}
)
}
}