Querying
Streaming data
Writing streaming queries
Streaming queries are executed in the same way as request/response
queries, but use the stream
keyword instead of find
.
For example - assuming a Kafka topic emitting stock price updates:
model StockPrice {
symbol : StockSymbol
price : StockPrice
}
service KafkaService {
stream stockPrices : Stream<StockPrice>
}
This can be queried using the following:
stream { StockPrice }
This can be combined with other standard querying tools, such as projections and mutations:
stream { StockPrice } as {
symbol : StockSymbol
updateReceived : Instant = now()
currentPrice : StockPrice
totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot
Filtering streams
To filter a stream, you can use the filterEach()
function:
stream { StockQuotes.filterEach( StockSymbol -> StockSymbol == 'AAPL' ) }
Joining streams
It’s possible to join multiple streams to one - for example, combining the results of multiple Kafka topics.
Streams are joins are either stateless, or stateful, using either a Union Type (Foo | Bar
) or an Intersection Type (Foo & Bar
) respectively.
The result of a joined stream is a type combining the properties of both types. To learn more about the resulting type from Union and Intersection types, including how property naming conflicts are resolved in the Taxi docs
Stateless joins
Streams of union types (stream { Foo | Bar }
emit as messages are emitted from either Foo or Bar. These are considered stateless, as
messages are emitted directly, without any attempt to link the messages.
A stream of a union type is stateless by default (but can be made stateful, as described below)
model OrderPlacedEvent {
orderId : OrderId
customerId : CustomerId
}
model FoodDeliveryEvent {
orderId : OrderId
deliveryState : DeliveryState
}
// Emits messages from both Foo and Bar as they arrive
stream { OrderPlacedEvent | FoodDeliveryEvent }
Results in:
// OrderEvent - no details from DeliveryEvent
{ "orderId": 123, "customerId" : 456, "deliveryState": null }
// DeliveryEvent - no details from OrderEvent
{ "orderId": 123, "customerId" : null, "deliveryState": "PickedUp" }
Field merging
The result of a union or intersection type contains all the fields from both types. However, orderId only appears once, as it has the same name and type on both types.
Learn more about how field naming conflicts are resolved in the Taxi docs
Stateful joins
A stateful join is where multiple streams are joined, and the messages between those streams are linked.
Stateful streams exist in two flavours - using Union types and using Intersection types.
State stores & memory considerations
Stateful streams have to hold state as messages arrive, and use a State Store. If a state store is not configured in the query, then state is held in Orbital's default state store - which is the Orbital cluster itself.
Depending on your instance size, this can result in out-of-memory issues on Orbital in heavy workloads where Orbital has to retain state.
Read more about how to configure a state store
Intersection Types
A stream using an intersection type emits messages only when all the streams have emitted a message for a given id.
model OrderPlacedEvent {
@Id
orderId : OrderId
customerId : CustomerId
}
model FoodDeliveryEvent {
orderId : OrderId
deliveryState : DeliveryState
}
// Emits messages from both Foo and Bar only
// after both have produced an event for a given message
stream { OrderPlacedEvent & FoodDeliveryEvent }
Results in:
// First event - an OrderPlacedEvent written on a Kafka topic,
// but is not emitted on the Orbital stream until a corresponding FoodDeliveryEvent is emitted
// Kafka:
{ "orderId": 123, "customerId" : 456 }
// Orbital:
// No event
// Second event: a FoodDeliveryEvent written on a Kafka topic,
// and a corresponding event on Orbital, combining data from both streams:
// Kafka:
{ "orderId" : 123, "deliveryState" : "PickedUp"}
// Orbital:
{ "orderId": 123, "customerId" : 456, "deliveryState": "PickedUp" }
Union types
A stream using a Union type emits messages as they arrive from each stream, joining messages together.
Streams with union types are stateless by default, so to enable state, you must use the @StateStore
annotation.
// Using the same model as above, omitted here for brevity
// Emits messages from both Foo and Bar as they arrive
@StateStore
stream { OrderPlacedEvent | FoodDeliveryEvent }
Results in:
// First event - an OrderPlacedEvent written on a Kafka topic.
// Kafka:
{ "orderId": 123, "customerId" : 456 }
// Orbital:
{ "orderId": 123, "customerId" : 456, "deliveryState": null }
// Second event: a FoodDeliveryEvent written on a Kafka topic,
// and a corresponding event on Orbital, combining data from both streams:
// Kafka:
{ "orderId" : 123, "deliveryState" : "PickedUp"}
// Orbital:
{ "orderId": 123, "customerId" : 456, "deliveryState": "PickedUp" }
Configuring a state store
When joining a stateful stream, Orbital needs to hold state - which requires a state store.
Orbital configures a default state store, using an embedded Hazelcast instance. This is a high performance state store, allowing state to be shared across a cluster.
However, this does share the same RAM as used by the Orbital cluster. Under heavy load, this could result in out-of-memory issues on your Orbital cluster.
It is possible to configure an external state store, using a dedicated standalone cache. Currently only Hazelcast is supported, but support for other external caches is planned.
To define a dedicated external state store, first deploy an external Hazelcast cluster, then add it as a connection in Orbital.
hazelcast {
myHazelcast {
connectionName = myHazelcast
addresses = ["localhost:5701"]
}
}
To use this state store in your queries, use the connection name in the @StateStore
annotation:
@StateStore(connection = "myHazelcast" )
stream { Foo | Bar }
or:
@StateStore(connection = "myHazelcast" )
query MySavedQuery {
stream { Foo | Bar }
}
StateStore configuration options
Parameter name | Definition | Default value |
---|---|---|
connection | The configured connection to use for the state store | null - default to storing in Orbital’s internal memory |
name | A name for this state store | null - a name is automatically generated if not provided |
maxIdleSeconds | The expiration period in seconds. State will be evicted when this expires (resets on read or write) | 180 |
Limitations
When using Hazelcast as a state store, an Hazelcast Map is created to store the state, with a default maxIdleSeconds
.
Once a map is created, Hazlecast does not support changing it’s configuration.
Attempting to change the configuration (eg the maxIdleSeconds
) after the map has been created will cause an error.
To work around this, assign a new name to the state store, using the name
parameter:
@StateStore(name = "NewStateStore", maxIdleSeconds = "30" )
query MySavedQuery {
stream { Foo | Bar }
}
Understanding joins
When joining messages between streams, messages are joined based on a shared Id.
Orbital looks for a type that is:
- Present on all models present in the join
- Where at least one of the fields has an
@Id
annotation.
When joining:
- If no fields match the above criteria,
- or if more than one set of fields match the above criteria,
then the join is ambiguous and an error is thrown.
Note that field names are not relevant in joins - Orbital only considers each fields type.
For example:
model FoodOrder {
@Id
id : OrderId
}
model FoodDeliveryUpdate {
@Id
deliveryUpdateId : DeliveryUpdateId
orderId : OrderId
}
stream { FoodOrder | FoodDeliveryUpdate }
In the above scenario, a join is performed on the type OrderId
, because it is the only type present on both models, where
one of the models contains the @Id
annotation.
DeliveryUpdateId
was not considered, as it is not present on FoodOrder
.
Running long-lived streaming queries
Orbital’s query editor is great for running short-lived streaming queries. However, often times you want a streaming query to continue in the background.
To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically this is checked into a git repository.
// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
stream { StockPrices }
call MyPostgresService::upsertStockPrice
}
Updating streaming queries
Streaming queries are automatically upgraded whenever their definition changes.
Enabling a streaming query
When each streaming query is detected for the first time, it’s disabled by default - to prevent accidental data changes (as streams are often mutating).
You can enable a streaming query either via the UI, or an API call.
Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.