Guides

Streaming data

We're working on it...

This guide is under construction. If you hit some rough edges, or find it confusing, come yell at us on slack.

This guide shows how to build bespoke, enriched event streams from Kafka.

Describing the data sources

The architecture for this demo
The architecture for this demo

Our demo has a few services running, which we’ll join together to create a bespoke event stream:

  • Kafka Service: A Topic that publishes a message whenever a new review is published
  • FilmsDb: A database containing a list of films
  • ListingsService: A REST API that tells us where our movies are currently playing

Our services share a taxonomy used to describe the elements we can fetch:

taxonomy.taxi
namespace petflix

type FilmId inherits String
type FilmTitle inherits String
type ReviewText inherits String
type PerformanceDate inherits DateTime
type TheatreName inherits String

Adding the Kafka broker

Configure the Kafka connection in connections.conf:

connections.conf
kafka {
    "myKafkaBroker" {
        connectionName=myKafkaBroker
        connectionParameters {
            brokerAddress="localhost:9092"
            groupId=orbital
        }
    }
}

Describing the Kafka topic

Next, add some Taxi metadata to the message written onto our Kafka topic:

syntax = "proto3";
import "taxi/dataType.proto";

message NewReviewPostedMessage {
  int32 filmId = 1 [(taxi.dataType)="petflix.FilmId"];
  string reviewText = 2 [(taxi.dataType)="petflix.ReviewText"];
}

Finally, declare the Kafka topic:

reviews.taxi
service KafkaService {
  stream reviews : Stream<NewReviewPostedMessage>
}

Creating an event stream

We can create a dedicated event stream, with the data we need by submitting the following query to Orbital:

stream { NewReviewPostedMessage } as {
  // We define the field names that matter to us.
  // Orbital matches on data types, as field names often differ between systems. 
  id : FilmId 
  review : ReviewText

  name : FilmTitle // Fetched by a database call
  nextPerformances: FilmListing[] // Fetched by a REST API call
}

Specifying The GroupId For Kafka Subscriptions

When Orbital subscribes to a Kafka Topic for a streaming query, it will use groupId connection paremeter defined in connections.conf to set the value of group id for the corresponding kafka consumer

connections.conf
kafka {
    "myKafkaBroker" {
        connectionName=myKafkaBroker
        connectionParameters {
            brokerAddress="localhost:9092"
            groupId=orbital
        }
    }
}

@ In the above configuration, the groupId is set to orbital. However, you can override groupId value in your queries by using the @StreamConsumer annotation. Here is an example:

@StreamConsumer(id = "reviewStreamGroup")
stream { NewReviewPostedMessage } as {
  id : FilmId 
  review : ReviewText

  name : FilmTitle // Fetched by a database call
  nextPerformances: FilmListing[] // Fetched by a REST API call
}

The Kafka Consumer created for the above streaming query will set the group id to reviewStreamGroup

Previous
Mixing XML and JSON
Next
Data pipelines