Managing data sources

Hazelcast as a data source

In addition to using Hazelcast as a cache, Orbital can use Hazelcast as a source for reading and writing data, as well as a streaming data source.

Adding a Hazelcast data source

To enable Hazelcast as a data source, first define a connection in your connections.conf file:

connections.conf
hazelcast {
   myHazelcast {
      connectionName = myHazelcast
      addresses = ["localhost:5701"]
   }
}

Supported formats

When values are written to Hazelcast, they must be serialized.

The following formats are supported. Note that because Orbital does not have access to custom Java classes, serialization formats that require classloading (such as Serializable, Externalizable and Custom Serialization) are not supported

Compact (preferred)

Compact Serialization is the preferred format when using Orbital to read/write to Hazelcast.

To enable Compact serialization, add a @CompactObject annotation to your Taxi definition:

import com.orbitalhq.hazelcast.CompactObject
import com.orbitalhq.hazelcast.HazelcastMap

@HazelcastMap(name = "films")
@CompactObject
closed model Film {
   @Id
   filmId : FilmId inherits Int
   // ...omitted...
}

JsonValue

Hazelcast’s JsonValue is supported for working with existing caches that have stored data using JsonValue.

Note - if you are both writing and reading the data using Orbital, it is recommended to use Compact, which has better performance.

TODO.

Writing data to Hazelcast

To write data into a Hazelcast Map, define a Hazelcast Service in your taxi project, with a write operation:

import com.orbitalhq.hazelcast.HazelcastService
import com.orbitalhq.hazelcast.HazelcastMap
import com.orbitalhq.hazelcast.UpsertOperation
import com.orbitalhq.hazelcast.CompactObject

// The HazelcastService annotation should specify the 
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcastConnection")
service HazelcastService {

   // Expose a write operation, and annotate it 
   // with UpsertOperation, to define the writing behaviour
   @UpsertOperation
   write operation upsert(Film):Film
}

// Define the name of the map to use. If not present in Hazelcast,
// the map wil be created
@HazelcastMap(name = "films")
// specify the serialization format 
@CompactObject
closed model Film {
   // A field annotated with @Id becomes the key.
   // Composite keys are not supported
   @Id
   filmId : FilmId inherits Int
   title : Title inherits String
   languages: Language[]
}

With your map declared, use a mutation query to insert data:

Examples

Inserting a static value into Hazelcast

// inserting a static value into Hazelcast
given { film : Film = 
  {
    filmId : 123,
    title : "Star Wars",  
    languages : ["English" , "American" ]
  } 
}
call HazelcastService::upsert

Storing a Query Result in Hazelcast

To write the outcome of a query to Hazelcast, it’s not necessary for the query’s result format to align with the format of the persisted value.

Orbital will automatically adapt the query result to the required persistence format, which may involve projections and even calling additional services if needed.

find { NetflixFilms[] } as {
  // projection not shown
}
call HazelcastService::upsert

Streaming data into Hazelcast

stream { FilmUploadedEvent }
// Each FilmUploadedEvent is projected into a Film
// which is the input parameter to `upsert`
call HazelcastService::upsert

Querying data from Hazelcast

Orbital supports querying from Hazelcast using both direct key lookups, and rich query criteria.

Defining a map to query

To query a map, you first define a service that exposes your Hazelcast Map.

Maps are exposed using taxi’s table declaration, as this indicates a data source that supports rich querying.

Here’s a complete example:

import com.orbitalhq.hazelcast.HazelcastService
import com.orbitalhq.hazelcast.HazelcastMap
import com.orbitalhq.hazelcast.CompactObject

// The HazelcastService annotation should specify the 
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcastConnection")
service HazelcastService {

    // Table is a shorthand to declare
    // a data source that supports rich querying.
    table films : Film[]
}

// Define the name of the map to query.
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
   // A field annotated with @Id becomes the key, which 
   // is used when performing key lookups
   @Id
   filmId : FilmId inherits Int
   title : Title inherits String
   languages: Language[]
}

Writing queries

Once a Hazelcast map is exposed, it can be queried as a standard data source, including used as a data source when projecting and joining data from other sources (such as APIs, Kafka topics, or Databases).

Here are some sample queries:

Fetching everything from a map

find { Film[] }

Fetching a value with a specific key

If criteria is defined against the key (as defined using an @Id annotation), then a key lookup is performed:

// Assuming FilmId is annotated as @Id in the Film model
// as shown...
model Film {
   @Id
   filmId : FilmId inherits Int
   // ..snip..
}

// Elsewhere, writing a query...
find { Film( FilmId === 123 ) }

Fetching values using criteria

// find all films with a FilmId < 105
find { Film[]( FilmId < 105 ) }

// find all films released after 2019 with the title Star Wars
find { Film[]( ReleaseYear < 2019 && Title == "Star Wars" ) }

Streaming data from Hazelcast

Hazelcast Map’s can be treated as data streams, where inserts or updates are created as streams of events which can be queried using Orbital.

The following events trigger the current state of the record to be written to the event stream:

  • Entry Added
  • Entry Updated

Declaring a Map as a Stream

To stream updates from a map, you first define a service that exposes your Hazelcast Map as stream

Here’s a complete example:

import com.orbitalhq.hazelcast.HazelcastService
import com.orbitalhq.hazelcast.HazelcastMap
import com.orbitalhq.hazelcast.CompactObject

// The HazelcastService annotation should specify the 
// name of the connection defined in your connections.conf
@HazelcastService(connectionName = "myHazelcastConnection")
service HazelcastService {
   stream films : Stream<Film>
}

// Define the name of the map to query.
@HazelcastMap(name = "films")
// specify the serialization format
@CompactObject
closed model Film {
   // A field annotated with @Id becomes the key, which 
   // is used when performing key lookups
   @Id
   filmId : FilmId inherits Int
   title : Title inherits String
   languages: Language[]
}

Writing streaming queries

Once a Hazelcast map is exposed as a stream, it can be queried as a standard data source, including used as a data source when projecting and joining data from other sources (such as APIs, Kafka topics, or Databases).

Below are some sample queries. It’s useful when testing to combine this with writing data to Hazelcast to trigger change events which produce values on the data stream.

Here are some sample queries:

Stream all updates from a map

stream { Film }

Stream all updates from a map, and enrich with data from other sources

stream { Film } as {
  id : FilmId
  reviewScore : FilmReviewScore // not present in the map, will be looked up from another data source
}[]

Stream only specific events from a map

// Only provide updates on Films whose FilmId is less than 300
stream { Film.filterEach( (FilmId) -> FilmId < 300  ) }
Previous
Kafka
Next
Connecting AWS services