Querying
Streaming data
Writing streaming queries
Streaming queries are executed in the same way as request/response
queries, but use the stream
keyword instead of find
.
For example - assuming a Kafka topic emitting stock price updates:
model StockPrice {
symbol : StockSymbol
price : StockPrice
}
service KafkaService {
stream stockPrices : Stream<StockPrice>
}
This can be queried using the following:
stream { StockPrice }
This can be combined with other standard querying tools, such as projections and mutations:
stream { StockPrice } as {
symbol : StockSymbol
updateReceived : Instant = now()
currentPrice : StockPrice
totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot
Filtering streams
To filter a stream, you can use the filterEach()
function:
stream { StockQuotes.filterEach( StockSymbol -> StockSymbol == 'AAPL' ) }
Running long-lived streaming queries
Orbital’s query editor is great for running short-lived streaming queries. However, often times you want a streaming query to continue in the background.
To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically this is checked into a git repository.
// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
stream { StockPrices }
call MyPostgresService::upsertStockPrice
}
Updating streaming queries
Streaming queries are automatically upgraded whenever their definition changes.
Enabling a streaming query
When each streaming query is detected for the first time, it’s disabled by default - to prevent accidental data changes (as streams are often mutating).
You can enable a streaming query either via the UI, or an API call.
Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.