Ruby: reading, parsing and forwarding large JSON files in small chunks (i.e. streaming)
I spent a couple of days figuring this out and I thought I'd share a tip to save someone else the hassle.
The problem: I have a Ruby API and I have to read huge JSON files from third party sources, parse them and do stuff with the data before forwarding it somewhere else. This can clearly result into a memory nightmare if everything had to be done in memory.
There seems to be quite a lot of community knowledge about how to stream data in Ruby from server to client (e.g. here) but not so much about how to do the same when reading a JSON stream.
Although you will end up passing on the data through Ruby Enumerators in both cases, there is a key difference:
- When pushing data, we know the produced JSON is sound and we just want to write it out to a stream without doing anything else;
- When reading data, if we want to consume it without keeping the entire document in memory, we need to make sense of it as it comes in without knowledge of how the document will further evolve.
I found a really nice article about how to consume XML streams, here. It even comes with a gem called Piperator which allows you to chain steps in your pipeline in a clean and readable way.
With a bit of help from the gem, I tried implementing the same in JSON using the Oj JSON parser, which I read outperforms all others out there.
Here's your example, where the key things to check out are the run
and the yield_chunk
methods:
require 'oj'
require 'piperator'
class JSONStreamParser < ::Oj::ScHandler
def initialize
@data_stream_writer = Oj::StringWriter.new
@running = false
end
def run(enumerable_data_source, &block)
if !@running
@running = true
@yielder = block
# This turns your enumerator into an IO class, very handy
# as Oj's sc_parse method wants an IO object.
io = Piperator::IO.new(enumerable_data_source)
Oj.sc_parse(self, io)
end
end
def hash_key(key)
update_current_path(:hash_key, key)
@data_stream_writer.push_key(key)
@last_key = key
end
def hash_start
@data_stream_writer.push_object(@last_key)
@last_key = nil
end
def hash_set(h, key, value)
@data_stream_writer.push_value(value, key)
end
def hash_end
@data_stream_writer.pop
yield_if_condition
end
def array_start
@data_stream_writer.push_array(@last_key)
@last_key = nil
end
def array_append(a, value)
@data_stream_writer.push_value(value) unless !value && @array_ended
@array_ended = false
end
def array_end
@data_stream_writer.pop
@array_ended = true
yield_if_condition
end
def add_value(value)
@data_stream_writer.push_value(value, @last_key)
@last_key = nil
end
def error(message, line, column)
p "ERROR: #{message}"
end
private
def yield_if_condition
# if whatever_logic
# @data_stream_writer.pop_all
# yield_chunk
# @data_stream_writer = Oj::StringWriter.new
# [ further logic depending on data structure ]
# end
end
def yield_chunk
@yielder.call @data_stream_writer.to_s
end
end
http_fetch = Enumerator.new do |yielder|
url = "https://raw.githubusercontent.com/zemirco/sf-city-lots-json/master/citylots.json"
request = Typhoeus::Request.new(url)
request.on_body do |chunk|
yielder << chunk
end
request.run
end
json_parse = Enumerator.new do |yielder|
parser = JSONStreamParser.new
parser.run(http_fetch) do |parsed_chunk|
yielder << parsed_chunk
end
end
json_parse.map{ |c| puts c }
Written by coconup
Related protips
1 Response
If you hade it on GitHub I'd appreciate link. Question is where is method: updatecurrentpath
def hashkey(key)
updatecurrentpath(:hashkey, key)
@datastreamwriter.pushkey(key)
@lastkey = key
end