Data streams
Streaming data
Writing streaming queries
Streaming queries are executed in the same way as request/response
queries, but use the stream
keyword instead of find
.
For example - assuming a Kafka topic emitting stock price updates:
model StockPrice {
symbol : StockSymbol
price : StockPrice
}
service KafkaService {
stream stockPrices : Stream<StockPrice>
}
This can be queried using the following:
stream { StockPrice }
This can be combined with other standard querying tools, such as projections and mutations:
stream { StockPrice } as {
symbol : StockSymbol
updateReceived : Instant = now()
currentPrice : StockPrice
totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot
Filtering streams
To filter a stream, you can use the filterEach()
function:
stream { StockQuotes.filterEach( StockSymbol -> StockSymbol == 'AAPL' ) }
Running long-lived streaming queries
Orbital’s query editor is great for running short-lived streaming queries. However, often times you want a streaming query to continue in the background.
Orbital’s stream server is responsible for long-lived streaming queries.
To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically this is checked into a git repository.
// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
stream { StockPrices }
call MyPostgresService::upsertStockPrice
}
Updating streaming queries
Streaming queries are automatically upgraded whenever their definition changes.
Enabling a streaming query
When each streaming query is detected for the first time, it’s disabled by default - to prevent accidental data changes (as streams are often mutating).
You can enable a streaming query either via the UI, or an API call.
Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.
The stream server
Orbital’s Stream Server ships as a separate docker image, responsible for executing saved streaming queries.
version: "3.3"
services:
## Other services omitted
stream-server:
image: orbitalhq/stream-server:latest
The stream server exposes the following ports:
- An HTTP port on 9615 (you can deploy a reverse SSL proxy in front of this)
- An RSocket port on 7755
By default, Orbital expects to be able to find the stream server at http://stream-server/
. Our default
Docker Compose
and Helm Chart configure this by default.
However, if you need to configure the address of the stream server, you can do so by editing the services.conf
file
deployed with Orbital’s main component:
services {
stream-server {
url="http://localhost:9615"
rsocket="tcp://localhost:7755"
}
}