Managing data sources

Connect a kafka topic

Orbital can read data from a Kafka topic to stream data as part of a query.

Before you begin

When declaring a topic in Taxi, you need to define a connection to the Kafka broker in your connections.conf file.

Read more about defining Kafka connections here.

Declaring a topic in Taxi

Kafka topics are declared in taxi as simple Operations which return a Stream of data.

// 1: Add the required imports
import io.orbital.kafka.KafkaService
import io.orbital.kafka.KafkaOperation

// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "moviesConnection" )
service MyKafkaService {

   // 3: Annotate the operation as a `@KafkaOperation`
   @KafkaOperation( topic = "hello-worlds", offset = "earliest" )
   // The topic is declared as a stream, and returns a Stream<>
   stream streamGoodThings():Stream<HelloWorld>
}
  1. Add the required imports
  2. Annotate the service with @KafkaService
  3. Annotate the operation as a @KafkaOperation, specifying the topic and offset.
  4. The return type should be a Stream<> of the defined message type.

Controlling deserialization

Message deserialization is defined by the model type being exposed. By default, models are expected to be JSON.

However, this can be controlled by annotating the model with a format annotation. Currently, formats are limited to protobuf only, but we are working on adding more.

See Using Protobuf for more information.

Previous
Databases
Next
Connecting AWS services