Managing data sources
Connect a kafka topic
Orbital can read data from a Kafka topic to stream data as part of a query.
Before you begin
When declaring a topic in Taxi, you need to define a connection to the Kafka broker in your connections.conf
file.
Read more about defining Kafka connections here.
Declaring a topic in Taxi
Kafka topics are declared in taxi as simple Operations which return a Stream of data.
// 1: Add the required imports
import io.orbital.kafka.KafkaService
import io.orbital.kafka.KafkaOperation
// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "moviesConnection" )
service MyKafkaService {
// 3: Annotate the operation as a `@KafkaOperation`
@KafkaOperation( topic = "hello-worlds", offset = "earliest" )
// The topic is declared as a stream, and returns a Stream<>
stream streamGoodThings():Stream<HelloWorld>
}
- Add the required imports
- Annotate the service with
@KafkaService
- The
connectionName
parameter should match a connection listed in the connections config file.
- The
- Annotate the operation as a
@KafkaOperation
, specifying the topic and offset. - The return type should be a
Stream<>
of the defined message type.
Controlling deserialization
Message deserialization is defined by the model type being exposed. By default, models are expected to be JSON.
However, this can be controlled by annotating the model with a format annotation. Currently, formats are limited to protobuf only, but we are working on adding more.
See Using Protobuf for more information.