Reference Source

js/messaging/stream.js

  1. // NPM IMPORTS
  2. import assert from 'assert'
  3. import Baconjs from 'baconjs'
  4.  
  5. // COMMON IMPORTS
  6. import T from '../utils/types'
  7.  
  8. let context = 'common/messaging/stream'
  9.  
  10.  
  11.  
  12. /**
  13. * Stream class for BaconJS stream wrapping.
  14. *
  15. * @author Luc BORIES
  16. * @license Apache-2.0
  17. */
  18. export default class Stream
  19. {
  20. /**
  21. * Create a stream.
  22. *
  23. * @returns {nothing}
  24. */
  25. constructor(arg_stream=undefined)
  26. {
  27. this.is_stream = true
  28.  
  29. this._source_stream = arg_stream ? arg_stream : new Baconjs.Bus()
  30. this._transformed_stream = this._source_stream
  31. this.counters = {}
  32. this.counters.msg_count = 0
  33. this.counters.msg_size = 0
  34. this.counters.errors_count = 0
  35. this.counters.subscribers_count = 0
  36. this._source_stream.onError(
  37. () => {
  38. this.counters.errors_count += 1
  39. }
  40. )
  41. }
  42.  
  43.  
  44.  
  45. /**
  46. * Get input stream.
  47. *
  48. * @returns {Baconjs.Bus}
  49. */
  50. get_source_stream()
  51. {
  52. return this._source_stream
  53. }
  54.  
  55.  
  56.  
  57. /**
  58. * Create a Stream instance with a DOM event source stream.
  59. *
  60. * @param {string} arg_dom_elem - DOM element.
  61. * @param {string} arg_event_name - DOM event name.
  62. *
  63. * @returns {Stream}
  64. */
  65. static from_dom_event(arg_dom_elem, arg_event_name)
  66. {
  67. return new Stream( Baconjs.fromEvent(arg_dom_elem, arg_event_name) )
  68. }
  69.  
  70.  
  71.  
  72. /**
  73. * Create a Stream instance with an event emitter source stream.
  74. *
  75. * @param {object} arg_emitter - event emitter.
  76. * @param {string} arg_event_name - event name.
  77. *
  78. * @returns {Stream}
  79. */
  80. static from_emitter_event(arg_emitter, arg_event_name)
  81. {
  82. return new Stream( Baconjs.fromEvent(arg_emitter, arg_event_name) )
  83. }
  84.  
  85.  
  86.  
  87. /**
  88. * Get output stream.
  89. *
  90. * @returns {Baconjs.Bus}
  91. */
  92. get_transformed_stream()
  93. {
  94. return this._transformed_stream
  95. }
  96.  
  97.  
  98.  
  99. /**
  100. * Set output stream.
  101. *
  102. * @param {Baconjs.Bus} arg_stream - transformed stream.
  103. *
  104. * @returns {Stream} - this
  105. */
  106. set_transformed_stream(arg_stream)
  107. {
  108. this._transformed_stream = arg_stream
  109. return this
  110. }
  111.  
  112.  
  113.  
  114. /**
  115. * Set output stream transformation.
  116. *
  117. * @param {function} arg_stream_transformation - function (source stream)=>{ return transformed stream }.
  118. *
  119. * @returns {Stream} - this
  120. */
  121. set_transformation(arg_stream_transformation)
  122. {
  123. assert( T.isFunction(arg_stream_transformation), context + ':transform:bad function')
  124. const src = this._source_stream
  125. const tr = this._transformed_stream
  126.  
  127. try {
  128. this._transformed_stream = arg_stream_transformation(src)
  129. } catch(e) {
  130. this._transformed_stream = tr
  131. console.error(context + ':set_transformation', e)
  132. }
  133. return this
  134. }
  135. /**
  136. * Get counters snapshot.
  137. *
  138. * @returns {object} - counters.
  139. */
  140. get_counters_snapshot()
  141. {
  142. const counters = Object.assign({}, this.counters)
  143. return counters
  144. }
  145. /**
  146. * Get counters snapshot and reset values to 0.
  147. */
  148. get_and_reset_counters_snapshot()
  149. {
  150. const counters = Object.assign({}, this.counters)
  151. this.counters.msg_count = 0
  152. this.counters.msg_size = 0
  153. this.counters.errors_count = 0
  154. this.counters.subscribers_count = 0
  155. return counters
  156. }
  157. /**
  158. * Push a value into the stream.
  159. *
  160. * @param {any}
  161. *
  162. * @returns {Stream} - this.
  163. */
  164. push(arg_value)
  165. {
  166. this.counters.msg_count += 1
  167. // console.log(arg_value, context + ':push:value')
  168. this._source_stream.push(arg_value)
  169. return this
  170. }
  171. /**
  172. * Subscribe to stream values.
  173. *
  174. * @param {Function} arg_handler - value handler f(value) => nothing.
  175. *
  176. * @returns {Function} - unsubscribe function
  177. */
  178. subscribe(arg_handler)
  179. {
  180. assert( T.isFunction(arg_handler), context + ':subscribe:bad handler function')
  181. this.counters.subscribers_count += 1
  182. const unsubscribe = this._transformed_stream.onValue(arg_handler)
  183. return () => {
  184. this.counters.subscribers_count -= 1
  185. unsubscribe()
  186. }
  187. // return this._transformed_stream.onValue(
  188. // (value) => {
  189. // console.log(value, context + ':subscribe:value')
  190. // }
  191. // )
  192. }
  193. /**
  194. * Subscribe to stream errors.
  195. *
  196. * @param {Function} arg_handler - value handler f(value) => nothing.
  197. *
  198. * @returns {Function} - unsubscribe function
  199. */
  200. on_error(arg_handler)
  201. {
  202. assert( T.isFunction(arg_handler), context + ':subscribe:bad handler function')
  203. this.counters.subscribers_count += 1
  204. const unsubscribe = this._transformed_stream.onError(arg_handler)
  205. return () => {
  206. this.counters.subscribers_count -= 1
  207. unsubscribe()
  208. }
  209. }
  210.  
  211.  
  212.  
  213. /**
  214. * Debounce immediate.
  215. *
  216. * @param {integer} arg_milliseconds - number of milliseconds.
  217. */
  218. debounce_immediate(arg_milliseconds)
  219. {
  220. this._transformed_stream = this._transformed_stream.debounceImmediate(arg_milliseconds)
  221. return this
  222. }
  223. }