Querying

Streaming data

Writing streaming queries

Streaming queries are executed in the same way as request/response queries, but use the stream keyword instead of find.

For example - assuming a Kafka topic emitting stock price updates:

model StockPrice {
  symbol : StockSymbol
  price : StockPrice
}

service KafkaService {
   stream stockPrices : Stream<StockPrice>
}

This can be queried using the following:

stream { StockPrice }

This can be combined with other standard querying tools, such as projections and mutations:

stream { StockPrice } as {
  symbol : StockSymbol
  updateReceived : Instant = now()
  currentPrice : StockPrice
  totalTradedQuantity : TotalTradedQuantity
}
call TradeBookService::saveTradeSnapshot

Filtering streams

To filter a stream, you can use the filterEach() function:

stream { StockQuotes.filterEach( StockSymbol -> StockSymbol == 'AAPL' ) }

Joining streams

It’s possible to join multiple streams to one - for example, combining the results of multiple Kafka topics.

Streams are joins are either stateless, or stateful, using either a Union Type (Foo | Bar) or an Intersection Type (Foo & Bar) respectively.

The result of a joined stream is a type combining the properties of both types. To learn more about the resulting type from Union and Intersection types, including how property naming conflicts are resolved in the Taxi docs

Stateless streams

Streams of union types (stream { Foo | Bar } emit as messages are emitted from either Foo or Bar. These are considered stateless, as messages are emitted directly, without any attempt to link the messages.

A stream of a union type is stateless by default (but can be made stateful, as described below)

model OrderPlacedEvent {
  orderId : OrderId
  customerId : CustomerId
}
model FoodDeliveryEvent {
  orderId : OrderId
  deliveryState : DeliveryState
}

// Emits messages from both Foo and Bar as they arrive
stream { OrderPlacedEvent | FoodDeliveryEvent }

Results in:

// OrderEvent - no details from DeliveryEvent
{ "orderId":  123, "customerId" :  456, "deliveryState":  null } 
// DeliveryEvent - no details from OrderEvent
{ "orderId":  123, "customerId" :  null, "deliveryState":  "PickedUp" } 

Field merging

The result of a union or intersection type contains all the fields from both types. However, orderId only appears once, as it has the same name and type on both types.

Learn more about how field naming conflicts are resolved in the Taxi docs

Stateful streams

A stateful stream is where multiple streams are joined, and the messages between those streams are linked.

Stateful streams exist in two flavours - using Union types and using Intersection types.

State stores & memory considerations

Stateful streams have to hold state as messages arrive, and use a State Store. If a state store is not configured in the query, then state is held in Orbital's default state store - which is the Orbital cluster itself.

Depending on your instance size, this can result in out-of-memory issues on Orbital in heavy workloads where Orbital has to retain state.

Read more about how to configure a state store

Intersection Types

A stream using an intersection type emits messages only when all the streams have emitted a message for a given id.

model OrderPlacedEvent {
  @Id
  orderId : OrderId
  customerId : CustomerId
}
model FoodDeliveryEvent {
  orderId : OrderId
  deliveryState : DeliveryState
}

// Emits messages from both Foo and Bar only 
// after both have produced an event for a given message
stream { OrderPlacedEvent & FoodDeliveryEvent }

Results in:

// First event - an OrderPlacedEvent written on a Kafka topic, 
// but is not emitted on the Orbital stream until a corresponding FoodDeliveryEvent is emitted

// Kafka:
{ "orderId":  123, "customerId" :  456 }
// Orbital:
// No event

// Second event: a FoodDeliveryEvent written on a Kafka topic,
// and a corresponding event on Orbital, combining data from both streams:
// Kafka:
{ "orderId" : 123, "deliveryState" : "PickedUp"}
// Orbital:
{ "orderId":  123, "customerId" :  456, "deliveryState":  "PickedUp" } 

Union types

A stream using a Union type emits messages as they arrive from each stream, joining messages together.

Streams with union types are stateless by default, so to enable state, you must use the @StateStore annotation.

// Using the same model as above, omitted here for brevity

// Emits messages from both Foo and Bar as they arrive
@StateStore
stream { OrderPlacedEvent | FoodDeliveryEvent }

Results in:

// First event - an OrderPlacedEvent written on a Kafka topic. 
// Kafka:
{ "orderId":  123, "customerId" :  456 }
// Orbital:
{ "orderId":  123, "customerId" :  456, "deliveryState":  null }

// Second event: a FoodDeliveryEvent written on a Kafka topic,
// and a corresponding event on Orbital, combining data from both streams:
// Kafka:
{ "orderId" : 123, "deliveryState" : "PickedUp"}
// Orbital:
{ "orderId":  123, "customerId" :  456, "deliveryState":  "PickedUp" } 

Configuring a state store

When joining a stateful stream, Orbital needs to hold state - which requires a state store.

Orbital configures a default state store, using an embedded Hazelcast instance. This is a high performance state store, allowing state to be shared across a cluster.

However, this does share the same RAM as used by the Orbital cluster. Under heavy load, this could result in out-of-memory issues on your Orbital cluster.

It is possible to configure an external state store, using a dedicated standalone cache. Currently only Hazelcast is supported, but support for other external caches is planned.

To define a dedicated external state store, first deploy an external Hazelcast cluster, then add it as a connection in Orbital.

hazelcast {
   myHazelcast {
      connectionName = myHazelcast
      addresses = ["localhost:5701"]
   }
}

To use this state store in your queries, use the connection name in the @StateStore annotation:

@StateStore(connection = "myHazelcast" )
stream { Foo | Bar }

or:

@StateStore(connection = "myHazelcast" )
query MySavedQuery {
  stream { Foo | Bar }
}

StateStore configuration options

Parameter nameDefinitionDefault value
connectionThe configured connection to use for the state storenull - default to storing in Orbital’s internal memory
nameA name for this state storenull - a name is automatically generated if not provided
maxIdleSecondsThe expiration period in seconds. State will be evicted when this expires (resets on read or write)180

Limitations

When using Hazelcast as a state store, an Hazelcast Map is created to store the state, with a default maxIdleSeconds. Once a map is created, Hazlecast does not support changing it’s configuration.

Attempting to change the configuration (eg the maxIdleSeconds) after the map has been created will cause an error. To work around this, assign a new name to the state store, using the name parameter:

@StateStore(name = "NewStateStore", maxIdleSeconds = "30" )
query MySavedQuery {
  stream { Foo | Bar }
}

Understanding joins

When joining messages between streams, messages are joined based on a shared Id.

Orbital looks for a type that is:

  • Present on all models present in the join
  • Where at least one of the fields has an @Id annotation.

When joining:

  • If no fields match the above criteria,
  • or if more than one set of fields match the above criteria,

then the join is ambiguous and an error is thrown.

Note that field names are not relevant in joins - Orbital only considers each fields type.

For example:

model FoodOrder {
  @Id
  id : OrderId
}
model FoodDeliveryUpdate {
  @Id
  deliveryUpdateId : DeliveryUpdateId
  orderId : OrderId
}

stream { FoodOrder | FoodDeliveryUpdate }

In the above scenario, a join is performed on the type OrderId, because it is the only type present on both models, where one of the models contains the @Id annotation.

DeliveryUpdateId was not considered, as it is not present on FoodOrder.

Running long-lived streaming queries

Orbital’s query editor is great for running short-lived streaming queries. However, often times you want a streaming query to continue in the background.

To deploy a long-lived streaming query, simply define a query in one of your taxonomy projects. Typically this is checked into a git repository.

MySavedQuery.taxi
// A sample query that streams data from a Stock price stream,
// and writes to Postgres
query MySavedQuery {
   stream { StockPrices }
   call MyPostgresService::upsertStockPrice
}

Updating streaming queries

Streaming queries are automatically upgraded whenever their definition changes.

Enabling a streaming query

When each streaming query is detected for the first time, it’s disabled by default - to prevent accidental data changes (as streams are often mutating).

You can enable a streaming query either via the UI, or an API call.

Note that once a streaming query has been enabled once, further updates are automatically deployed and the stream remains running.

Previous
Mutations
Next
Publishing queries (http / websockets)