Skip to content

A Kafka consumer framework. Like Rack but for Kafka.

License

Notifications You must be signed in to change notification settings

sspinc/glass-octopus

Repository files navigation

Glass Octopus

A Kafka consumer framework. Like Rack but for Kafka.

Installation

Add this line to your application's Gemfile:

gem 'glass_octopus'

And then execute:

$ bundle

Or install it yourself as:

$ gem install glass_octopus

This gem requires Ruby 2.1 or higher.

Getting started

Pick your adapter:

  • ruby-kafka

      # in your Gemfile
      gem "glass_octopus"
      gem "ruby-kafka"
    

Currently only ruby-kafka is supported out of the box. If you need to use another adapter you can pass a class to config.adapter. See documentation for GlassOctopus::Configuration#adapter.

# in app.rb
require "bundler/setup"
require "glass_octopus"

app = GlassOctopus.build do
  use GlassOctopus::Middleware::CommonLogger

  run Proc.new { |ctx|
    puts "Got message: #{ctx.message.key} => #{ctx.message.value}"
  }
end

GlassOctopus.run(app) do |config|
  config.adapter :ruby_kafka do |kafka|
    kafka.broker_list = %[localhost:9092]
    kafka.topic       = "mytopic"
    kafka.group_id    = "mygroup"
    kafka.client_id   = "myapp"
  end
end

Run it with bundle exec ruby app.rb

For more examples look into the example directory.

For the API documentation please see the documentation site

Handling Avro messages with Schema Registry

Glass Octopus can be used with Avro messages validated against a schema. For this, you need a running Schema Registry service. You also need to have the avro_turf gem installed.

# in your Gemfile
gem "avro_turf"

Add the AvroParser middleware with the Schema Registry URL to your app.

# in app.rb
app = GlassOctopus.build do
  use GlassOctopus::Middleware::AvroParser, "http://schema_registry_url:8081"
  # ...
end

Supported middleware

  • ActiveRecord

    Return any active connection to the pool after the message has been processed.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::ActiveRecord
      # ...
    end
  • New Relic

    Record message processing as background transactions. Also captures uncaught exceptions.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::NewRelic, MyConsumer
      # ...
    end
  • Sentry

    Report uncaught exceptions to Sentry.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::Sentry
      # ...
    end
  • Common logger

    Log processed messages and runtime of the processing.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::CommonLogger
      # ...
    end
  • Parse messages as JSON

    Parse message value as JSON. The resulting hash is placed in context.params.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::JsonParser
      # ...
      run MyConsumer
    end
    
    class MyConsumer
      def call(ctx)
        puts ctx.params # message value parsed as JSON
        puts ctx.message # Raw unaltered message
      end
    end

    Optionally you can specify a class to be instantiated with the message hash.

    app = GlassOctopus.build do
      use GlassOctopus::Middleware::JsonParser, class: MyMessage
      run MyConsumer
    end
    
    class MyMessage
      def initialize(attributes)
        attributes.each { |k,v| public_send("#{k}=", v) }
      end
    end

Development

Install docker and docker-compose to run Kafka and Zookeeper for tests.

Start Kafka and Zookeeper

$ docker-compose up

Run all tests including integration tests:

$ rake test:all

Running tests without integration tests:

$ rake # or rake test

When you are done run docker-compose down to clean up docker containers.

License

The gem is available as open source under the terms of the MIT License.