[フレーム]
Last Updated: August 01, 2022
·
6.541K
· coconup

Ruby: reading, parsing and forwarding large JSON files in small chunks (i.e. streaming)

I spent a couple of days figuring this out and I thought I'd share a tip to save someone else the hassle.

The problem: I have a Ruby API and I have to read huge JSON files from third party sources, parse them and do stuff with the data before forwarding it somewhere else. This can clearly result into a memory nightmare if everything had to be done in memory.

There seems to be quite a lot of community knowledge about how to stream data in Ruby from server to client (e.g. here) but not so much about how to do the same when reading a JSON stream.

Although you will end up passing on the data through Ruby Enumerators in both cases, there is a key difference:

  • When pushing data, we know the produced JSON is sound and we just want to write it out to a stream without doing anything else;
  • When reading data, if we want to consume it without keeping the entire document in memory, we need to make sense of it as it comes in without knowledge of how the document will further evolve.

I found a really nice article about how to consume XML streams, here. It even comes with a gem called Piperator which allows you to chain steps in your pipeline in a clean and readable way.

With a bit of help from the gem, I tried implementing the same in JSON using the Oj JSON parser, which I read outperforms all others out there.

Here's your example, where the key things to check out are the run and the yield_chunk methods:

require 'oj'
require 'piperator'

class JSONStreamParser < ::Oj::ScHandler
 def initialize
 @data_stream_writer = Oj::StringWriter.new
 @running = false
 end

 def run(enumerable_data_source, &block)
 if !@running
 @running = true
 @yielder = block

 # This turns your enumerator into an IO class, very handy
 # as Oj's sc_parse method wants an IO object.
 io = Piperator::IO.new(enumerable_data_source)
 Oj.sc_parse(self, io)
 end
 end

 def hash_key(key)
 update_current_path(:hash_key, key)
 @data_stream_writer.push_key(key)
 @last_key = key
 end

 def hash_start
 @data_stream_writer.push_object(@last_key)
 @last_key = nil
 end

 def hash_set(h, key, value)
 @data_stream_writer.push_value(value, key)
 end

 def hash_end
 @data_stream_writer.pop
 yield_if_condition
 end

 def array_start
 @data_stream_writer.push_array(@last_key)
 @last_key = nil
 end

 def array_append(a, value)
 @data_stream_writer.push_value(value) unless !value && @array_ended
 @array_ended = false
 end

 def array_end
 @data_stream_writer.pop
 @array_ended = true
 yield_if_condition
 end

 def add_value(value)
 @data_stream_writer.push_value(value, @last_key)
 @last_key = nil
 end

 def error(message, line, column)
 p "ERROR: #{message}"
 end

 private
 def yield_if_condition
 # if whatever_logic
 # @data_stream_writer.pop_all
 # yield_chunk
 # @data_stream_writer = Oj::StringWriter.new
 # [ further logic depending on data structure ]
 # end
 end

 def yield_chunk
 @yielder.call @data_stream_writer.to_s
 end
end

http_fetch = Enumerator.new do |yielder|
 url = "https://raw.githubusercontent.com/zemirco/sf-city-lots-json/master/citylots.json"
 request = Typhoeus::Request.new(url)
 request.on_body do |chunk|
 yielder << chunk
 end
 request.run
end

json_parse = Enumerator.new do |yielder|
 parser = JSONStreamParser.new
 parser.run(http_fetch) do |parsed_chunk|
 yielder << parsed_chunk
 end
end

json_parse.map{ |c| puts c }

1 Response
Add your response

If you hade it on GitHub I'd appreciate link. Question is where is method: updatecurrentpath

def hashkey(key)
update
currentpath(:hashkey, key)
@datastreamwriter.pushkey(key)
@last
key = key
end

over 1 year ago ·

AltStyle によって変換されたページ (->オリジナル) /