Managing data sources

Kafka as a data source

Orbital can both read data from a Kafka topic to stream data as part of a query, and write data back to Kafka.

Defining a connection to your Kafka broker

Kafka connections are stored in your connections.conf config file, under the kafka element.

The connection specifies how to connect to a broker - but not the specifics of actual topics. (Those are specified on the @KafkaOperation() metadata on a taxi operation definition.)

kafka {
   my-kafka-connection {
      connectionName=my-kafka-connection
      connectionParameters {
         brokerAddress="localhost:29092,localhost:39092"
         groupId="orbital"
      }
   }
   another-kafka-connection {
      connectionName=another-kafka-connection
      connectionParameters {
         brokerAddress="localhost:29092,localhost:39092"
         groupId="orbital"
      }
   }
}

The following configuration options are supported under the connectionParameters

Config optionPurpose
brokersA comma-separated list of broker addresses when connecting to the Kafka broker
groupIdThe groupId to use when connecting to a topic

Additional Kafka connection properties

In addition to brokers and groupId (which are shorthand for bootstrap.servers and group.id respectively), you can provide any of the Kafka consumer and producer config settings.

For example, to define a secure connection:

kafka {
   my-kafka-connection {
      connectionName=my-kafka-connection
      connectionParameters {
         brokerAddress="localhost:29092,localhost:39092"
         groupId="orbital"
         "security.protocol"="SASL_PLAINTEXT"
         "sasl.mechanism"="PLAIN"
         "sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='myKafkaUser' password="${KAFKA_PASSWORD}";"
      }
   }
}

When using Kafka connections to consume data, specifying an appropriate groupId is important, and impacts the behaviour of how messages are received.

If you are experiencing long delays at the start of a query, or no results returned, you may need to modify your groupId.

You can set groupId in connectionParameters to specify the group id of your Orbital consumer.

kafka {
   my-kafka-connection {
      connectionName=my-kafka-connection
      connectionParameters {
         brokerAddress="localhost:29092,localhost:39092"
         groupId="myGroupId"
      }
   }
}

Read more about consumer group id’s in the Kafka documentation

Troubleshooting connection problems

Using env variables in auth settings

It’s common in Kafka configs that usernames and passwords are embedded within settings, like sasl.jaas.config.

It’s recommended that usernames and passwords are read from env variables, which can lead to tricky HOCON concatenation issues.

The solution is to rely on HOCON concatenation. eg:

"sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";"

Pay close attention to how the " in the setting are paired - ie., no escaping, and no using ' to nest inner values.

For example:

kafka {
   my-kafka-connection {
      // ... omitted for brevity
      connectionParameters {
         "sasl.jaas.config"="org.apache.kafka.common.security.plain.PlainLoginModule required username='myKafkaUser' password="${KAFKA_PASSWORD}";"
      }
   }
}

Query doesn't return any data (even though data exists)

You might have more subscribers with the ‘groupId’ than the configured partition count for the topic.
In this case, your Orbital instance won’t get any data from the topic. Please set the groupId in your kafka and specify a new groupId:

kafka {
   my-kafka-connection {
      connectionName=my-kafka-connection
      connectionParameters {
         brokerAddress="localhost:29092,localhost:39092"
         groupId="myGroupId"
      }
   }
}

Long connection times when starting a query

When multiple Orbital instances connect (and disconnect) to the same Kafka broker with the same groupId (such as the default setting of groupId = Orbital), this can trigger on the Kafka broker.

As a result, long delays are sometimes observed before data is streamed from Kafka, while the cluster is rebalancing.

Resolve this by setting the groupId for your kafka connection to a unique value in your connections.conf

Querying a Kafka topic

A Kafka topic can be queried using a standard Taxi query, including enriching data from Kafka with other services, and joining multiple streams together

Exposing a topic

Kafka topics are declared in taxi as simple Operations which return a Stream of data.

// 1: Add the required imports
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {

   // 3: Annotate the operation as a `@KafkaOperation`
   @KafkaOperation( topic = "stockPrices", offset = "earliest" )
   // The topic is declared as a stream, and returns a Stream<>
   stream streamStockPrices():Stream<StockPrice>
}
  1. Add the required imports
  2. Annotate the service with @KafkaService
  3. Annotate the operation as a @KafkaOperation, specifying the topic and offset.
  4. The return type should be a Stream<> of the defined message type.

Keys, Headers and Metadata

The message key, message headers, and kafka-specific metadata (such as offset, partition and timestamp) can be provided into a Taxi model using annotations:

Message Key

The message key can be accessed using the com.orbtalhq.kafka.KafkaMessageKey annotation on a field:

import com.orbitalhq.kafka.KafkaMessageKey

model Movie {
  // The key from Kafka will be read into the id property
  @KafkaMessageKey
  id : MovieId inherits String
  title : Title inherits String
}

// Rest of the kafka topic declaration continues...
@KafkaService( connectionName = "moviesConnection" )
service MovieService {
  @KafkaOperation( topic = "movies", offset = "earliest" )
  stream streamMovieQuery:Stream<Movie>
}

Kafka Metadata

Kafka metadata (such as offset, partition and timestamp) can be accessed using the com.orbitalhq.kafka.KafkaMessageMetadata annotation on a field.

KafkaMessageMetadata takes a single parameter, which is the metadata type you wish to read. Defined by the enum type KafkaMetadataType, the following values are defined:

enum KafkaMetadataType {
  Partition,
  Offset,
  Timestamp,
  TimestampType
  Topic
}

For example:

import com.orbitalhq.kafka.KafkaMessageMetadata
import com.orbitalhq.kafka.KafkaMetadataType

model Movie {
  @KafkaMessageMetadata(KafkaMetadataType.Offset)
  offset : Int

  @KafkaMessageMetadata(KafkaMetadataType.Timestamp)
  timestamp : Long

  @KafkaMessageMetadata(KafkaMetadataType.TimestampType)
  timestampType : String

  @KafkaMessageMetadata(KafkaMetadataType.Partition)
  partition : Int

  @KafkaMessageMetadata(KafkaMetadataType.Topic)
  topic : String

  // Other fields continue...
  title : Title inherits String
}

// Rest of the kafka topic declaration continues...
@KafkaService( connectionName = "moviesConnection" )
service MovieService {
  @KafkaOperation( topic = "movies", offset = "earliest" )
  stream streamMovieQuery:Stream<Movie>
}

Headers

Kafka supports including arbitrary message headers along with the message - which are often used for things like correlation keys, etc.

These headers can be accessed using the com.orbitalhq.kafka.KafkaHeader annotation:

import com.orbitalhq.kafka.KafkaHeader

model Movie {
   @KafkaHeader("correlationId")
   correlationId : CorrelationId inherits String
   title : Title inherits String
}

 // Rest of the kafka topic declaration continues...
@KafkaService( connectionName = "moviesConnection" )
service MovieService {
   @KafkaOperation( topic = "movies", offset = "earliest" )
   stream streamMovieQuery:Stream<Movie>
}

Controlling deserialization

Message deserialization is defined by the model type being exposed. By default, models are expected to be JSON.

However, this can be controlled by annotating the model with a format annotation.

Two common formats - Protobuf and Avro are supported.

Example queries

Streaming data from Kafka

// Invokes the `streamStockPrices` stream declared above
stream { StockPrice }

Enrich data from Kafka with other data sources

Data from a Kafka topic can be projected to enrich it with data from other sources.

Data requested that is not present on the Kafka payload is looked up from other sources, using Orbital’s standard projections.

stream { StockPrice } as {
  ticker : StockTicker  // avaialble on the Kafka topic
  lastTradedPrice : LastTradedPrice // Looked up from another data source
}[]

Filtering kafka streams

This examples reads all messages from the Kafka topic, but only emits those with a stock ticker of AAPL on the resulting stream:

stream { StockPrice.filterEach( ( StockTicker ) -> StockTicker == 'AAPL' ) }

Streaming from Kafka to a database

Streams from Kafka can be inserted into a database (or any other writable source— such as Hazelcast or Dynamo) using a mutating query.

As with all mutating queries, it’s not necessary for the data from Kafka to align with the format of the data being written to the data destination.

Orbital will automatically adapt the query result to the required persistence format, which may involve projections and even calling additional services if needed.

// First, ensure that your data destination exposes a writeable data source
// Full config omitted for brevity
service MyDatabaseService {
   @UpsertOperation
   write operation updateStockPrices(StockPriceSnapshot):StockPriceSnapshot
}

// Then, define a streaming query.
// In this example, the data format for StockPrice coming off of Kafka
// is different from the data being written to our database (StockPriceSnapshot)
// so Orbital transforms the data automatically
stream { StockPrice }
call MyDatabaseService::updateStockPrices

Joining multiple streams

It is possible to join multiple Kafka streams together

Writing to a Kafka topic

To make a topic writable, declare a write operation in a Kafka service:

// 1: Add the required imports
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

// 2: Annotate the service as a `KafkaService`.
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {

   // ...other kafka topics omitted...

   // 3: Annotate the operation as a `@KafkaOperation`
   @KafkaOperation( topic = "stockPrices", offset = "earliest" )
   // The operation is declared as a write operation
   write operation publishStockPrice(StockPrice):StockPrice
}

Examples

Writing a static value onto a Kafka topic

given { stockPrice : StockPrice = 
  {
    symbol : 'AAPL',
    price : 12.00203,  
  } 
}
call MyKafkaService::publishStockPrice

Consuming from one Kafka topic, and writing to another topic

To stream data from a Kafka topic, enrich and republish

@KafkaService( connectionName = "market-prices" )
service MyKafkaService {

   @KafkaOperation( topic = "stockPrices" )
   stream prices : Stream<StockPrice>

   @KafkaOperation( topic = "enrichedPrices" )
   write operation publishEnrichedPrices(EnrichedStockPrice):EnrichedStockPrice 
} 

The following query will consume from the stockPrices topic, and for each message, transform to an EnrichedStockPrice, invoking any other services required to inject required data.

stream { StockPrice }
// The input parameter to publishEnrichedPrices
// is a EnrichedStockPrice, so each incoming 
// StockPrice message is transformed to a
// EnrichedStockPrice payload, and published onto the
// enrichedPrices topic
call MyKafkaService::publishEnrichedPrices

Building a REST API that publishes to Kafka

This is a full example, where we create an HTTP endpoint accepting a POST request with a ticker symbol.

type StockSymbol inherits String
// The inbound request sent over HTTP requesting a stock price
model StockPricePublicationRequest {
  ticker : StockSymbol
}

// The message we'll be publishing to Kafka
parameter model StockPriceUpdate {
  ticker : StockSymbol
  currentPrice : StockPrice 
}

closed model CurrentStockPrice {
  price : StockPrice
}

service PriceService {
  @HttpOperation(url="http://fakeurl/prices/{symbol}", method = "GET")
  operation getCurrentPrice(@PathVariable("symbol") symbol:StockSymbol):CurrentStockPrice  
}

@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
   @KafkaOperation( topic = "stockPrices", offset = "earliest" )
   write operation publishStockPrice(StockPriceUpdate):StockPriceUpdate
}

@HttpOperation(path = "/api/q/publishStockPrice", method = "POST")
query MySavedQuery(@RequestBody request:StockPriceRequest) {
  given { request }
  call MyKafkaService::publishStockPrice
}

The above example works as follows:

  • A POST request is sent to /api/q/publishStockPrice with a body of:
{ "ticker" : "AAPL" }
  • The query asks for publishStockPrice to be called, which means a StockPriceUpdate must be constructed
  • To build a StockPriceUpdate, the currentPrice : StockPrice is required, which is available from the price field of CurrentStockPrice object, returned from getCurrentPrice
  • A request to http://fakeurl/prices/AAPL is issued to discover the current stock price, returning:
{ "price" : 117.34 }
  • Finally, we have enough information to build a StockPriceRequest, so the message is published to Kafka:
{ "ticker" : "AAPL", "currentPrice" : 117.34 }
Previous
MongoDB
Next
Hazelcast