Last Updated: August 01, 2022
·
6.128K
· coconup

Ruby: reading, parsing and forwarding large JSON files in small chunks (i.e. streaming)

I spent a couple of days figuring this out and I thought I'd share a tip to save someone else the hassle.

The problem: I have a Ruby API and I have to read huge JSON files from third party sources, parse them and do stuff with the data before forwarding it somewhere else. This can clearly result into a memory nightmare if everything had to be done in memory.

There seems to be quite a lot of community knowledge about how to stream data in Ruby from server to client (e.g. here) but not so much about how to do the same when reading a JSON stream.

Although you will end up passing on the data through Ruby Enumerators in both cases, there is a key difference:

  • When pushing data, we know the produced JSON is sound and we just want to write it out to a stream without doing anything else;
  • When reading data, if we want to consume it without keeping the entire document in memory, we need to make sense of it as it comes in without knowledge of how the document will further evolve.

I found a really nice article about how to consume XML streams, here. It even comes with a gem called Piperator which allows you to chain steps in your pipeline in a clean and readable way.

With a bit of help from the gem, I tried implementing the same in JSON using the Oj JSON parser, which I read outperforms all others out there.

Here's your example, where the key things to check out are the run and the yield_chunk methods:

require 'oj'
require 'piperator'

class JSONStreamParser < ::Oj::ScHandler
  def initialize
    @data_stream_writer = Oj::StringWriter.new
    @running = false
  end

  def run(enumerable_data_source, &block)
    if !@running
      @running = true
      @yielder = block

      # This turns your enumerator into an IO class, very handy
      # as Oj's sc_parse method wants an IO object.
      io = Piperator::IO.new(enumerable_data_source)
      Oj.sc_parse(self, io)
    end
  end

  def hash_key(key)
    update_current_path(:hash_key, key)
    @data_stream_writer.push_key(key)
    @last_key = key
  end

  def hash_start
    @data_stream_writer.push_object(@last_key)
    @last_key = nil
  end

  def hash_set(h, key, value)
    @data_stream_writer.push_value(value, key)
  end

  def hash_end
    @data_stream_writer.pop
    yield_if_condition
  end

  def array_start
    @data_stream_writer.push_array(@last_key)
    @last_key = nil
  end

  def array_append(a, value)
    @data_stream_writer.push_value(value) unless !value && @array_ended
    @array_ended = false
  end

  def array_end
    @data_stream_writer.pop
    @array_ended = true
    yield_if_condition
  end

  def add_value(value)
    @data_stream_writer.push_value(value, @last_key)
    @last_key = nil
  end

  def error(message, line, column)
    p "ERROR: #{message}"
  end

  private
  def yield_if_condition
    # if whatever_logic
    #   @data_stream_writer.pop_all
    #   yield_chunk
    #   @data_stream_writer = Oj::StringWriter.new
    #   [ further logic depending on data structure ]
    # end
  end

  def yield_chunk
    @yielder.call @data_stream_writer.to_s
  end
end

http_fetch = Enumerator.new do |yielder|
  url = "https://raw.githubusercontent.com/zemirco/sf-city-lots-json/master/citylots.json"
  request = Typhoeus::Request.new(url)
  request.on_body do |chunk|
    yielder << chunk
  end
  request.run
end

json_parse = Enumerator.new do |yielder|
  parser = JSONStreamParser.new
  parser.run(http_fetch) do |parsed_chunk|
    yielder << parsed_chunk
  end
end

json_parse.map{ |c| puts c }

1 Response
Add your response

If you hade it on GitHub I'd appreciate link. Question is where is method: updatecurrentpath

def hashkey(key)
update
currentpath(:hashkey, key)
@datastreamwriter.pushkey(key)
@last
key = key
end

over 1 year ago ·