Where developers come to connect, share, build and be inspired.

1

Parsing the Kafka protocol with Erlang. Pattern matching FTW!

5601 views


This is post is about how I took a raw response from a Kafka broker that looked like...

<<0,0,0,0,0,0,0,3,0,0,0,3,0,25,118,97,103,114,97,110,116,
45,117,98,117,110,116,117,45,112,114,101,99,105,115,101,
45,54,52,0,0,35,133,0,0,0,1,0,25,118,97,103,114,97,110,
116,45,117,98,117,110,116,117,45,112,114,101,99,105,115,
101,45,54,52,0,0,35,131,0,0,0,2,0,25,118,97,103,114,97,
110,116,45,117,98,117,110,116,117,45,112,114,101,99,105,
115,101,45,54,52,0,0,35,132,0,0,0,3,0,0,0,2,97,49,0,0,0,
2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,
0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,
2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,
0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,
0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,
2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,
0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

...and patched an erlang kafka client to convert the blob into:

{metadata,0,
      [{broker,2,<<"vagrant-ubuntu-precise-64">>,9092},
       {broker,1,<<"vagrant-ubuntu-precise-64">>,9091},
       {broker,3,<<"vagrant-ubuntu-precise-64">>,9093}],
      [{topic,<<"a3">>,undefined,
              [{partition,1,0,1,
                          [{replica,2},{replica,1}],
                          [{isr,2},{isr,1}]},
               {partition,0,0,3,
                          [{replica,1},{replica,3}],
                          [{isr,1},{isr,3}]}]},
       {topic,<<"a2">>,undefined,
              [{partition,1,0,2,[{replica,2}],[{isr,2}]},
               {partition,0,0,1,[{replica,1}],[{isr,1}]}]},
       {topic,<<"a1">>,undefined,
              [{partition,1,0,1,[{replica,1}],[{isr,1}]},
               {partition,0,0,3,[{replica,3}],[{isr,3}]}]}]}

...by reading through the Kafka wire protocol documented as :

Response => CorrelationId ResponseMessage
    CorrelationId => int32
    ResponseMessage => MetadataResponse

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port
  NodeId => int32
  Host => string
  Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
  PartitionErrorCode => int16
  PartitionId => int32
  Leader => int32
  Replicas => [int32]
  Isr => [int32]

from https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse

A note about Kafka

Firstly a word about Kafka. Kafka is your infrastructure's commit log and a re-think of the message queue in a distributed world. More at http://kafka.apache.org, but I recommend to begin reading its wire protocol.

Messages are published or consumed from topics. Partitions and replication factors for a topic can be set during its creation; which producers/consumers have to match. So in a sense - Kafka forces the decision of which node to hit to the client. To publish or fetch, data must be sent to the broker that is currently acting as the leader for a given partition. This data can be queried from a broker directly for producers, and from zookeeper for consumer clients.

Clients maintain this themselves by querying metadata from any broker. Although a broker may not be the leader for a topic, it will have metadata about all topics, partitions and leaderships. Which broker is queried to get the metadata is again up to the client.

Kafka Clients in the wild

The Kafka client written in Go by Shopify http://godoc.org/github.com/Shopify/sarama has by far the best documentation and featureset outside Java/Scala/in-house LinkedIn clients IMHO.

Some clients still connect to zookeeper, if auto discovery for brokers is enabled. This is usually the case when no kafka hosts have been provided in a config.

Since I'm working on erlang these days ( reminder to catch up on ErlangFactory '14 videos! ), I noticed several existing erlang clients still connecting to zookeeper to get metadata of brokers instead of directly pinging brokers. From what I've understood from the friendly folks at #apache-kafka, this is more of a 0.7 paradigm. Since 0.8, the producer does not need to connect to zookeeper to find out which partition to write to. It may choose to - but this is up to the client.

I personally prefer querying the brokers for metadata, but unfortunately did not find a library that did this.

The closest implementation to just sending packets between a broker was in https://github.com/rmenke/ekafka Here is how it encodes a query for metadata.

encode_metadata_request(CorrelationId, ClientId, Topics) ->
    MetadataRequest = encode_array([encode_string(Topic) || Topic <- Topics]),
    encode_request(?METADATA_REQUEST, CorrelationId, ClientId, MetadataRequest).

Since the decode functionality however was not implemented, and gave me a chance to explore the Kafka wire Protocol.

Read the protocol

Let's re-visit what the documentation on the Kafka docs says about the Metadata API response:

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port
  NodeId => int32
  Host => string
  Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
  PartitionErrorCode => int16
  PartitionId => int32
  Leader => int32
  Replicas => [int32]
  Isr => [int32]

Note to self: Someday I'm going to come up with an elegant way to describe header/payload/protocol information in a structured machine readable format!)

If you read only one paper/link on Kafka. Make it the one of the wire protocol (the first link). It is a pleasure to read. That said, several assumptions made it difficult for new client developers to see for eg: the size of Host . it just says string. (another section however talks about the size, but it took me a while to find it) . Or that [Broker] inside square brackets meant a 32-bit integer conveying how many brokers would be padded before the brokers.

Anyhow this is what I was presented with this evening to make sense of. The following is a valid metadata response from a kafka broker.

<<0,0,0,0,0,0,0,3,0,0,0,3,0,25,118,97,103,114,97,110,116,
45,117,98,117,110,116,117,45,112,114,101,99,105,115,101,
45,54,52,0,0,35,133,0,0,0,1,0,25,118,97,103,114,97,110,
116,45,117,98,117,110,116,117,45,112,114,101,99,105,115,
101,45,54,52,0,0,35,131,0,0,0,2,0,25,118,97,103,114,97,
110,116,45,117,98,117,110,116,117,45,112,114,101,99,105,
115,101,45,54,52,0,0,35,132,0,0,0,3,0,0,0,2,97,49,0,0,0,
2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,
0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,
2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,
0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,
0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,
2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,
0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

Let the reverse engineering begin

Having worked with reverse engineering AIM/oscar packets, YMSG chat packets, MQTT packets, and others - this presented me with an interesting challenge.

The information I knew was that the default kafka ports started with 9091 through 9093, and that I had created a topic a1 a while back.

From the docs I knew I would be looking for the Broker first.

the first 4 bytes didn't make sense until i read this...

Response => CorrelationId ResponseMessage
  CorrelationId => int32
  ResponseMessage => MetadataResponse

The correlationid, is analogous to a request id that you pass into the request. Int32 meant i needed to skip 32 bits or 4 bytes.

NOTE: this is how you parse the first 4 bytes in erlang ( binary is a primative data type in erlang)

<<First4Bytes:32, RemainingBytes/binary>>

In this case the first 4 bytes were

<0,0,0,0>>

The next 4 bytes were

<<0,0,0,3>> 

So with this, I could construct the first two bytes:

  %  0,0,0,0,                                     %% correlation_id
  %  0,0,0,3,                                     %% 3 nodes

That's when i realised kafka liked to follow a headerlen,header pattern followed by most well-designed protocols. I started looking for patterns .

  • <> patterns that could well be used to signify length

    • binary representations of port numbers like 9091 that maps to something like <>

    try this on an erlang shell

    <9091:32> = <> - and repetitive patterns that could signify common hostnames

I was able to fill a few fields, and had questions about several (denoted by ?)

    % 4  bytes  %  0,0,0,0,              %% cor_id
    % 4  bytes %  0,0,0,3,              %% 3 nodes

    ??? 0,0,0,3,0,25,118,97,103,114,97,110,116,
45,117,98,117,110,116,117,45,112,114,101,99,105,115,101,
45,54,52,

    % 4 bytes  % 0,0,35,133,          %% = port 9093

    ??? 0,0,0,1,0,25,118,97,103,114,97,110,
116,45,117,98,117,110,116,117,45,112,114,101,99,105,115,
101,45,54,52,

    % 4 bytes % 0,0,35,131,           %% = port 9091

    ??? 0,0,0,2,0,25,118,97,103,114,97,
110,116,45,117,98,117,110,116,117,45,112,114,101,99,105,
115,101,45,54,52,

   % 4 bytes % 0,0,35,132,             %% = port 9092

    ?? 0,0,0,3,0,0,0,2,97,49,0,0,0,
    2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,
    0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,
    2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,
    0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,
    0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,
    2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,
    0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

Going back to the docs, and it was easy to dissect the Brokers half of the packet.

MetadataResponse => [Broker][TopicMetadata]
   Broker => NodeId Host Port
   NodeId => int32
   Host => string
   Port => int32

which mapped to

    % 4  %  0,0,0,0,                                     %% correlation id
    % 4  %  0,0,0,3,                                     %% 3 nodes

    % 4  %  0,0,0,3,                                      %% NODE1
    % 2  %  0,25,                                          %% HOST1 LEN
    % 25 %  118,97,103,114,97,.....52,          %% HOST1
    % 4  %  0,0,35,133,                                %% PORT1

    % 4  %  0,0,0,1,                                       %% NODE2
    % 2  %  0,25,                                           %% HOST2 LEN
    % 25 %  118,97,103,114,97,....,52,           %% HOST2
    % 4  %  0,0,35,131,                                 %% PORT2

    % 4  %  0,0,0,2,                                      %% NODE3
    % 4  %  0,25,                                          %% HOST3 LEN
    % 25 %  118,97,103,114,97,.....52,          %% HOST3
    % 4  %  0,0,35,132,                                %% PORT3

The Broker part of the protocol seemed wrapped up, at least visually understood. The next part would be trickier since there were nested structures. Multiple topics could have multiple partitions. And each partition could have multiple replicas and isr's.

TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
   TopicErrorCode => int16
   PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
   PartitionErrorCode => int16
   PartitionId => int32
   Leader => int32
   Replicas => [int32]
   Isr => [int32]

So the above had to match the remaining binary blob that looked something like this...

<<0,0,0,3,0,0,0,2,97,49,0,0,0,
2,0,0,0,0,0,0,0,0,0,3,0,0,0,1,0,0,0,3,0,0,0,1,0,0,0,3,0,
0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,1,0,0,0,
2,97,50,0,0,0,2,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,0,0,1,0,0,
0,1,0,0,0,1,0,0,0,0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,1,
0,0,0,2,0,0,0,2,97,51,0,0,0,2,0,0,0,0,0,0,0,0,0,3,0,0,0,
2,0,0,0,3,0,0,0,1,0,0,0,2,0,0,0,3,0,0,0,1,0,0,0,0,0,1,0,
0,0,1,0,0,0,2,0,0,0,1,0,0,0,2,0,0,0,2,0,0,0,1,0,0,0,2>>

The next part took a while to correctly dissect because of the nested partitions and replicas.

    % 0,0,0,3,   % topics len
    % 0,0,0,2,   % topic1 name len
    % 97,49,      % topic1 name a1
    % 0,0,0,2,   % topic1 partitions len

    % 0,0,         % topic1 partition1 error code
    % 0,0,0,0,   % topic1 partition1
    % 0,0,0,3    % topic1 partition1 leaderid
    % 0,0,0,1,   % topic1 partition1 replicas len
    % 0,0,0,3,   % topic1 partition1 replica1
    % 0,0,0,1,   % topic1 partition1 isr len
    % 0,0,0,3,   % topic1 partition1 isr1

    % 0,0,         % topic1 partition2 error code
    % 0,0,0,1,   % topic1 partition2
    % 0,0,0,1,   % topic1 partition2 leaderid
    % 0,0,0,1,   % topic1 partition2 replicas len
    % 0,0,0,1,   % topic1 partition2 replica1
    % 0,0,0,1,   % topic1 partition2 isr len
    % 0,0,0,1,   % topic1 partition2 isr1

    % 0,0 0,2,  % topic2 name len
    % 97,50,     % topic2 name a2
    % 0,0,0,2,  % topic2 partitions len

    % 0,0,        % topic2 partition1 error code
    % 0,0,0,0,  % topic2 partition1
    % 0,0,0,1,  % topic2 partition1 leaderid
    % 0,0,0,1,  % topic2 partition1 replicas len
    % 0,0,0,1,  % topic2 partition1 replica1
    % 0,0,0,1,  % topic2 partition1 isr len
    % 0,0,0,1,  % topic2 partition1 isr1

    %0,0,0,0,   % topic2 partition2 error code
    % 0,1,         % topic2 partition2
    % 0,0,0,2,  % topic2 partition2 leaderid
    % 0,0,0,1,  % topic2 partition2 replicas len
    % 0,0,0,2,  % topic2 partition2 replica1
    % 0,0,0,1,  % topic2 partition2 isr len
    % 0,0,0,2,  % topic2 partition2 isr1

    % 0,0,0,2,  % topic3 name len
    % 97,51,     % topic3 name a3
    % 0,0,0,2,  % topic3 partitions len
    % 0,0,        % topic3 partition error code
    % 0,0,0,0   % topic3 partition1
    % ,0,0,0,3, % topic3 partition1 leaderid
    % 0,0,0,2,  % topic3 partition1 replicas len
    % 0,0,0,3,  % topic3 partition1 replica1
    % 0,0,0,1,  % topic3 partition1 replica2
    % 0,0,0,2,  % topic3 partition1 isr len
    % 0,0,0,3,  % topic3 partition1 isr1
    % 0,0,0,1,  % topic3 partition1 isr2

Code

With a clear picture of the chunks and their sizes, It was time to now write my erlang kafka metadata parser.

decode_metadata_response(Packet)->
    case Packet of
        <<CorrelationId:32, Rest/binary>> ->
            {Brokers, Topics, _ } = decode_to_brokers_and_topics(Rest),
            #metadata{ correlation_id = CorrelationId, brokers = Brokers, topics = Topics};
        _ ->
            #metadata{ }
    end.

decode_to_brokers_and_topics(Packet)->
    {Brokers,TopicsPacket} = decode_to_brokers(Packet),
    {Topics,Rest} = decode_to_topics(TopicsPacket),
    {Brokers,Topics,Rest}.

decode_to_brokers(Packet) ->
    case Packet of
        <<Len:32, Rest/binary>>->
            decode_to_brokers(Len,Rest,[]);
        _Else ->
            []
    end.
 decode_to_brokers(0, Packet, Previous)->
    {Previous,Packet};
decode_to_brokers(Counter, Packet, Previous)->
    {Next,Rest} = decode_to_broker(Packet),
    decode_to_brokers(Counter-1, Rest, [Next|Previous]).
decode_to_broker(<<NodeId:32, HostLen:16, Host:HostLen/binary,Port:32,Rest/binary>>)->
    {#broker{ node_id = NodeId, host = Host, port = Port},
     Rest};
decode_to_broker(Rest)->
    {#broker{},Rest}.

decode_to_topics(Packet) ->
    case Packet of
         <<Len:32,Rest/binary>> ->
            decode_to_topics(Len,Rest,[]);
         _E ->
            []
     end.
decode_to_topics(0, Packet, Previous)->
    {Previous, Packet};
decode_to_topics(Counter, Packet, Previous) ->
    {Next,Rest} = decode_to_topic(Packet),
    decode_to_topics(Counter-1, Rest, [Next|Previous]).
decode_to_topic(<<NameLen:32, Name:NameLen/binary,PartitionsBinary/binary>>)->
    {Partitions,Rest} = decode_to_partitions(PartitionsBinary),
    {#topic{ name = Name, partitions = Partitions}, Rest};
decode_to_topic(Rest)->
    {#topic{},Rest}.

decode_to_partitions(Packet) ->
    case Packet of
         <<Len:32,Rest/binary>> ->
            decode_to_partitions(Len,Rest,[]);
         _ ->
            []
     end.
decode_to_partitions(0, Packet, Previous)->
    {Previous, Packet};
decode_to_partitions(Counter, Packet, Previous) ->
    {Next,Rest} = decode_to_partition(Packet),
    decode_to_partitions(Counter-1, Rest, [Next|Previous]).
decode_to_partition(<<ErrorCode:16, Id:32, Leader:32, ReplicasBinary/binary>>)->
    {Replicas,Isrs,Rest} = decode_to_replicas_and_isrs(ReplicasBinary),
    {#partition{ id = Id, error_code = ErrorCode, leader = Leader, replicas = Replicas, isrs = Isrs },
     Rest};
decode_to_partition(Rest)->
    {#partition{},Rest}.

decode_to_replicas_and_isrs(Packet)->
    {Replicas,IsrsPacket} = decode_to_replicas(Packet),
    {Isrs,Rest} = decode_to_isrs(IsrsPacket),
    {Replicas,Isrs,Rest}.

decode_to_replicas(Packet) ->
    case Packet of
         <<Len:32,Rest/binary>> ->
            decode_to_replicas(Len,Rest,[]);
         _ ->
            []
     end.
decode_to_replicas(0, Packet, Previous)->
    {Previous, Packet};
decode_to_replicas(Counter, Packet, Previous) ->
    {Next,Rest} = decode_to_replica(Packet),
    decode_to_replicas(Counter-1, Rest, [Next|Previous]).
decode_to_replica(<<Id:32, Rest/binary>>)->
    {#replica{ id = Id },
     Rest};
decode_to_replica(Rest)->
    {#replica{},Rest}.

decode_to_isrs(Packet) ->
    case Packet of
         <<Len:32,Rest/binary>> ->
            decode_to_isrs(Len,Rest,[]);
         _ ->
             []
     end.
decode_to_isrs(0, Packet, Previous)->
    {Previous, Packet};
decode_to_isrs(Counter, Packet, Previous) ->
    {Next,Rest} = decode_to_isr(Packet),
    decode_to_isrs(Counter-1, Rest, [Next|Previous]).
decode_to_isr(<<Id:32, Rest/binary>>)->
    {#isr{ id = Id },
     Rest};
decode_to_isr(Rest)->
    {#isr{},Rest}.

Recursive

Syntax errors aside, when I passed in a packet into my decode function for the first time, things fell into place beautifully with all the functional and recursive goodness.

The pseudocode is roughly as follows:

decode_to_replicas_and_isrs(Packet)->
     %% i know that this blob begins with replicas, and ends with isr data
    {Replicas,IsrsPacket} = decode_to_replicas(Packet),
     %% IsrsPacket is basically the part of packet that is beyond the replica length
     %% as specified by the replica headers
    {Isrs,Rest} = decode_to_isrs(IsrsPacket),
     %% Rest in this case could be another partition as defined by the partitions header
    {Replicas,Isrs,Rest}.

 %% here is how the above function was used
    {Replicas,Isrs,Rest} = decode_to_replicas_and_isrs(ReplicasBinary),
    {#partition{ id = Id, error_code = ErrorCode, leader = Leader, replicas = Replicas, isrs = Isrs },

Several of the functions look the same apart from the part that actually pattern-matches 1 specific entity. But for the sake of readability and clear distinction of records - I've left them as is. The fact that they are similar simply proves the code can be refactored further.

Conclusion

To round up, here is the input again: > MetadataResponse = <>

> ekafka_protocol:decode_metadata_response(MetadataResponse). 

And the resulting output

{metadata,0,
      [{broker,2,<<"vagrant-ubuntu-precise-64">>,9092},
       {broker,1,<<"vagrant-ubuntu-precise-64">>,9091},
       {broker,3,<<"vagrant-ubuntu-precise-64">>,9093}],
      [{topic,<<"a3">>,undefined,
              [{partition,1,0,1,
                          [{replica,2},{replica,1}],
                          [{isr,2},{isr,1}]},
               {partition,0,0,3,
                          [{replica,1},{replica,3}],
                          [{isr,1},{isr,3}]}]},
       {topic,<<"a2">>,undefined,
              [{partition,1,0,2,[{replica,2}],[{isr,2}]},
               {partition,0,0,1,[{replica,1}],[{isr,1}]}]},
       {topic,<<"a1">>,undefined,
              [{partition,1,0,1,[{replica,1}],[{isr,1}]},
               {partition,0,0,3,[{replica,3}],[{isr,3}]}]}]}

Suggestions,issues and patches welcome on my repo

https://github.com/bosky101/ekafka

I also sent this as a pull request to ekafka which I'm looking forward to continue contributing to

https://github.com/rmenke/ekafka/pull/1

Pattern matching FTW!

~Bosky | @bhaskerkode

PS: Hat-tip to rmenke for getting ekafka started, milindparikh for his inspring erlang+kafka work and #erlang as always

Add a comment