Guides
Streaming data
We're working on it...
This guide shows how to build bespoke, enriched event streams from Kafka.
Describing the data sources
Our demo has a few services running, which we’ll join together to create a bespoke event stream:
- Kafka Service: A Topic that publishes a message whenever a new review is published
- FilmsDb: A database containing a list of films
- ListingsService: A REST API that tells us where our movies are currently playing
Our services share a taxonomy used to describe the elements we can fetch:
namespace petflix
type FilmId inherits String
type FilmTitle inherits String
type ReviewText inherits String
type PerformanceDate inherits DateTime
type TheatreName inherits String
Adding the Kafka broker
Configure the Kafka connection in connections.conf
:
kafka {
"myKafkaBroker" {
connectionName=myKafkaBroker
connectionParameters {
brokerAddress="localhost:9092"
groupId=orbital
}
}
}
Describing the Kafka topic
Next, add some Taxi metadata to the message written onto our Kafka topic:
syntax = "proto3";
import "taxi/dataType.proto";
message NewReviewPostedMessage {
int32 filmId = 1 [(taxi.dataType)="petflix.FilmId"];
string reviewText = 2 [(taxi.dataType)="petflix.ReviewText"];
}
Finally, declare the Kafka topic:
service KafkaService {
stream reviews : Stream<NewReviewPostedMessage>
}
Creating an event stream
We can create a dedicated event stream, with the data we need by submitting the following query to Orbital:
stream { NewReviewPostedMessage } as {
// We define the field names that matter to us.
// Orbital matches on data types, as field names often differ between systems.
id : FilmId
review : ReviewText
name : FilmTitle // Fetched by a database call
nextPerformances: FilmListing[] // Fetched by a REST API call
}
Specifying The GroupId For Kafka Subscriptions
When Orbital subscribes to a Kafka Topic for a streaming query, it will use groupId
connection paremeter defined in connections.conf
to set the value of group id
for the corresponding kafka consumer
kafka {
"myKafkaBroker" {
connectionName=myKafkaBroker
connectionParameters {
brokerAddress="localhost:9092"
groupId=orbital
}
}
}
@
In the above configuration, the groupId
is set to orbital
. However, you can override groupId
value in your queries by using the @StreamConsumer
annotation. Here is an example:
@StreamConsumer(id = "reviewStreamGroup")
stream { NewReviewPostedMessage } as {
id : FilmId
review : ReviewText
name : FilmTitle // Fetched by a database call
nextPerformances: FilmListing[] // Fetched by a REST API call
}
The Kafka Consumer created for the above streaming query will set the group id
to reviewStreamGroup