match.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. 'use strict'
  2. const check = require('check-types')
  3. const DataStream = require('./datastream')
  4. const events = require('./events')
  5. const Hoopy = require('hoopy')
  6. const walk = require('./walk')
  7. const DEFAULT_BUFFER_LENGTH = 1024
  8. module.exports = match
  9. /**
  10. * Public function `match`.
  11. *
  12. * Asynchronously parses a stream of JSON data, returning a stream of items
  13. * that match the argument. Note that if a value is `null`, it won't be matched
  14. * because `null` is used to signify end-of-stream in node.
  15. *
  16. * @param stream: Readable instance representing the incoming JSON.
  17. *
  18. * @param selector: Regular expression, string or predicate function used to
  19. * identify matches. If a regular expression or string is
  20. * passed, only property keys are tested. If a predicate is
  21. * passed, both the key and the value are passed to it as
  22. * arguments.
  23. *
  24. * @option numbers: Boolean, indicating whether numerical keys (e.g. array
  25. * indices) should be coerced to strings before testing the
  26. * match. Only applies if the `selector` argument is a string
  27. * or regular expression.
  28. *
  29. * @option ndjson: Set this to true to parse newline-delimited JSON,
  30. * default is `false`.
  31. *
  32. * @option yieldRate: The number of data items to process per timeslice,
  33. * default is 16384.
  34. *
  35. * @option bufferLength: The length of the match buffer, default is 1024.
  36. *
  37. * @option highWaterMark: If set, will be passed to the readable stream constructor
  38. * as the value for the highWaterMark option.
  39. *
  40. * @option Promise: The promise constructor to use, defaults to bluebird.
  41. **/
  42. function match (stream, selector, options = {}) {
  43. const scopes = []
  44. const properties = []
  45. const emitter = walk(stream, options)
  46. const matches = new Hoopy(options.bufferLength || DEFAULT_BUFFER_LENGTH)
  47. let streamOptions
  48. const { highWaterMark } = options
  49. if (highWaterMark) {
  50. streamOptions = { highWaterMark }
  51. }
  52. const results = new DataStream(read, streamOptions)
  53. let selectorFunction, selectorString, resume
  54. let coerceNumbers = false
  55. let awaitPush = true
  56. let isEnded = false
  57. let length = 0
  58. let index = 0
  59. if (check.function(selector)) {
  60. selectorFunction = selector
  61. selector = null
  62. } else {
  63. coerceNumbers = !! options.numbers
  64. if (check.string(selector)) {
  65. check.assert.nonEmptyString(selector)
  66. selectorString = selector
  67. selector = null
  68. } else {
  69. check.assert.instanceStrict(selector, RegExp)
  70. }
  71. }
  72. emitter.on(events.array, array)
  73. emitter.on(events.object, object)
  74. emitter.on(events.property, property)
  75. emitter.on(events.endArray, endScope)
  76. emitter.on(events.endObject, endScope)
  77. emitter.on(events.string, value)
  78. emitter.on(events.number, value)
  79. emitter.on(events.literal, value)
  80. emitter.on(events.end, end)
  81. emitter.on(events.error, error)
  82. emitter.on(events.dataError, dataError)
  83. return results
  84. function read () {
  85. if (awaitPush) {
  86. awaitPush = false
  87. if (isEnded) {
  88. if (length > 0) {
  89. after()
  90. }
  91. return endResults()
  92. }
  93. }
  94. if (resume) {
  95. const resumeCopy = resume
  96. resume = null
  97. resumeCopy()
  98. after()
  99. }
  100. }
  101. function after () {
  102. if (awaitPush || resume) {
  103. return
  104. }
  105. let i
  106. for (i = 0; i < length && ! resume; ++i) {
  107. if (! results.push(matches[i + index])) {
  108. pause()
  109. }
  110. }
  111. if (i === length) {
  112. index = length = 0
  113. } else {
  114. length -= i
  115. index += i
  116. }
  117. }
  118. function pause () {
  119. resume = emitter.pause()
  120. }
  121. function endResults () {
  122. if (! awaitPush) {
  123. results.push(null)
  124. }
  125. }
  126. function array () {
  127. scopes.push([])
  128. }
  129. function object () {
  130. scopes.push({})
  131. }
  132. function property (name) {
  133. properties.push(name)
  134. }
  135. function endScope () {
  136. value(scopes.pop())
  137. }
  138. function value (v) {
  139. let key
  140. if (scopes.length > 0) {
  141. const scope = scopes[scopes.length - 1]
  142. if (Array.isArray(scope)) {
  143. key = scope.length
  144. } else {
  145. key = properties.pop()
  146. }
  147. scope[key] = v
  148. }
  149. if (v === null) {
  150. return
  151. }
  152. if (selectorFunction) {
  153. if (selectorFunction(key, v, scopes.length)) {
  154. push(v)
  155. }
  156. } else {
  157. if (coerceNumbers && typeof key === 'number') {
  158. key = key.toString()
  159. }
  160. if ((selectorString && selectorString === key) || (selector && selector.test(key))) {
  161. push(v)
  162. }
  163. }
  164. }
  165. function push (v) {
  166. if (length + 1 === matches.length) {
  167. pause()
  168. }
  169. matches[index + length++] = v
  170. after()
  171. }
  172. function end () {
  173. isEnded = true
  174. endResults()
  175. }
  176. function error (e) {
  177. results.emit('error', e)
  178. }
  179. function dataError (e) {
  180. results.emit('dataError', e)
  181. }
  182. }