Streaming data

This guide shows how to build bespoke, enriched event streams from Kafka.

Describing the data sources

The architecture for this demo
Our demo has a few services running, which we’ll join together to create a bespoke event stream:

  • Kafka Service: A Topic that publishes a message whenever a new review is published
  • FilmsDb: A database containing a list of films
  • ListingsService: A REST API that tells us where our movies are currently playing

Our services share a taxonomy used to describe the elements we can fetch:
namespace petflix

type FilmId inherits String
type FilmTitle inherits String
type ReviewText inherits String
type PerformanceDate inherits DateTime
type TheatreName inherits String

Adding the Kafka broker

Configure the Kafka connection in connections.conf:

kafka {
    "myKafkaBroker" {
        connectionParameters {

Describing the Kafka topic

Next, add some Taxi metadata to the message written onto our Kafka topic:

syntax = "proto3";
import "taxi/dataType.proto";

message NewReviewPostedMessage {
  int32 filmId = 1 [(taxi.dataType)="petflix.FilmId"];
  string reviewText = 2 [(taxi.dataType)="petflix.ReviewText"];

Finally, declare the Kafka topic:
service KafkaService {
  stream reviews : Stream<NewReviewPostedMessage>

Creating an event stream

We can create a dedicated event stream, with the data we need by submitting the following query to Orbital:

stream { NewReviewPostedMessage } as {
  // We define the field names that matter to us.
  // Orbital matches on data types, as field names often differ between systems. 
  id : FilmId 
  review : ReviewText

  name : FilmTitle // Fetched by a database call
  nextPerformances: FilmListing[] // Fetched by a REST API call

Orbital performs the integration, and delivers us messages containing exactly the data we need.

