Querying

Streaming data

Writing streaming queries

Streaming queries are executed in the same way as request/response queries, but use the stream keyword instead of find.

For example - assuming a Kafka topic emitting stock price updates:

model StockPrice {
  symbol : StockSymbol
  price : StockPrice
}

service KafkaService {
   stream stockPrices : Stream<StockPrice>
}

This can be queried using the following:

stream { StockPrice }

This can be combined with other standard querying tools, such as projections and mutations:

stream { StockPrice } as {
  symbol : StockSymbol
  updateReceived : Instant = now()
  currentPrice : StockPrice
  totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot

Filtering streams

To filter a stream, you can use the filterEach() function:

stream { StockQuotes.filterEach( StockSymbol -> StockSymbol == 'AAPL' ) }

Running long-lived streaming queries

Orbital’s query editor is great for running short-lived streaming queries. However, often times you want a streaming query to continue in the background.

Orbital’s stream server is responsible for long-lived streaming queries.

To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically this is checked into a git repository.

MySavedQuery.taxi
// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
   stream { StockPrices }
   call MyPostgresService::upsertStockPrice
}

Updating streaming queries

Streaming queries are automatically upgraded whenever their definition changes.

Enabling a streaming query

When each streaming query is detected for the first time, it’s disabled by default - to prevent accidental data changes (as streams are often mutating).

You can enable a streaming query either via the UI, or an API call.

Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.

Previous
Mutations
Next
Publishing queries (http / websockets)