Managing data sources

Describing Mongo Database

Orbital can both read data from a Mongo collection to fetch data as part of a query, and write data back to Mongo.

Defining a Mongo connection

Mongo connections are stored in your connections.conf config file, under the mongo element.

The connection specifies how to connect to a Mongo Database.

mongo {
   usersMongo {
      connectionName=usersMongo
      connectionParameters {
        dbName = "sample_mflix"
        connectionString = "mongodb+srv://orbital:PASSWORD@orbital.xxxx.mongodb.net/?retryWrites=true&w=majority&appName=Orbital"
      }
   }
}

The following configuration options are mandatory under the connectionParameters

Config optionPurpose
dbNameName of the Mongo Database.
connectionStringA Valid Mongo Connection String (see Mongo Db Connection Strings for details.)

Defining a Collection mapping

Collections are exposed to Orbital using the annotation @com.orbitalhq.mongo.Collection on a model.

Fields names in the model are expected to align with field names from the collection.

Here’s an example:

import com.orbitalhq.mongo.Collection

type FirstName inherits String
type Password inherits String
type Email inherits String

@Collection(connection = "usersMongo", collection = "users")
model User {
      name : FirstName
      password : Password
      email: Email
}

The @Collection annotation contains the following parameters:

ParameterDescription
connectionThe name of a connection, as defined in your connections configuration file.
collectionThe name of the collection

It’s possible to use environment variables in these annotations, as described here.

Defining a mongo service

// 1: Add the required imports
import com.orbitalhq.mongo.MongoService

// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
   // 3: expose a table operation for a collection model.
   table user : User[]
}                             

Mapping the ObjectId

Use an @Id annotation to define the column that represents the Mongo ObjectId.

Here’s an example:

import com.orbitalhq.mongo.Collection

type FirstName inherits String
type Password inherits String
type Email inherits String
type MongoObjectId inherits String

@Collection(connection = "usersMongo", collection = "users")
 model UserWithObjectId {
     @Id
     objectId: MongoObjectId
     name : FirstName
     password : Password
     email: Email
}

Mapping a Mongo Unique Index

Use @UniqueIndex annotation to map a Mongo Unique index to your taxi model

Here is an example:

import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex

@Collection(connection = "accountsMongo", collection = "accounts")
 model Account {
   @UniqueIndex
   accountId: AccountId inherits String
   accountName: AccountName inherits String
}

Above example annotates accountId field with @UniqueIndex which implies that the underlying mongo collection accounts has a Mongo Unique index defined on it. Orbital consider the field annotated with @UniqueIndex when exeucting upsert operations against the Mongo Collection. Consider the following service definition for the above model:

@MongoService( connection = "accountsMongo" )
service AccountsDb {
   table accounts : Account[]
   @UpsertOperation
   write operation upsertAccount(Account):Account   
}

When we issue the following query:

query.taxi
 given { account : Account = { accountId : "1" , accountName: "My USD Account" } }
 call AccountsDb::upsertAccount

Orbital will insert an account with accountId = "1" into the mongo accounts collection. When the following query is issued subsequently:

query.taxi
 given { account : Account = { accountId : "1" , accountName: "My GPB Account" } }
 call AccountsDb::upsertAccount

Orbital will make use of @UniqueIndex and update the existing account data with the new accountName value rather than attempting to insert a new account into the Mongo Collection.

Mongo setOnInsert Support

To leverage $setOnInsert for @Upsert operations, add the @SetOnInsert annotation on the field that needs to be set only when inserting a new document into the corresponding Mongo Collection.

Here is an example:

import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex
import com.orbitalhq.mongo.SetOnInsert

 @Collection(connection = "accountsMongo", collection = "accounts")
 model Account {
   @UniqueIndex
   accountId : AccountId inherits String
   currency : Currency inherits String
   @SetOnInsert
   insertedAt: InsertedAt inherits Instant = now()
   updatedAt: UpdateAt inherits Instant = now()
}

This model indicates that:

  • insertedAt should be set when a new Account is inserted into the accounts collection
  • subsequent updates to the same account document should not change the initial insertedAt value.

Querying Collections

To expose a Mongo database as a source for queries, the database must have a service and table operation exposed for a collection.

// 1: Add the required imports
import com.orbitalhq.mongo.MongoService

// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
   // 3: expose a table operation for a collection model.
   table user : User[]
   table mongoUsers: UserWithObjectId[]
}                             

The @MongoService annotation contains the following parameters:

ParameterDescription
connectionThe name of a connection, as defined in yourconnections configuration file.

Sample queries

Fetch everything from a collection

find { User[] }

Fetch values by criteria

find { User[]( FirstName == "Harry" ) }
find { User[]( FirstName == "Harry" || FirstName == "Joe" ) }

Using IN and NOT IN operators

MongoDB collections support in and not in operators for efficient filtering against multiple values. These operators work with any field type and generate optimized MongoDB queries.

Filtering with IN operator

To find documents where a field matches any value in a list:

// Find users with specific IDs
find { User[]( UserId in [1, 2, 3] ) }
// Find users from specific countries
find { User[]( Country in ["US", "UK", "CA"] ) }

Filtering with NOT IN operator

To find documents where a field does not match any value in a list:

// Find users excluding specific IDs
find { User[]( UserId not in [1, 2] ) }
// Find users not from specific countries
find { User[]( Country not in ["US", "UK"] ) }

Loading documents based on data from other services

A powerful pattern is to use in operators with data fetched from other services. This enables efficient bulk lookups:

// Given these models:

model Family {
  id : FamilyId inherits Int
  members : PersonId[]  // Array of person IDs
}


service FamilyApi {
  operation getFamily(FamilyId):Family
}

@Collection(connection = "usersMongo", collection = "people")
model Person {
  @Id
  id : PersonId inherits Int
  name : Name inherits String
}

@MongoService( connection = "usersMongo" )
service UsersDb {
  table people : Person[]
}

You can fetch family data from an API, then load all family members from MongoDB in a single query:

// Load all people belonging to family ID 1
given { FamilyId = 1 }
find { Person[]( PersonId in Family::PersonId[] ) }

This pattern works with any data transformation. For example, if family members are nested objects:

model FamilyMember {
  id : PersonId
}

model Family {
  id : FamilyId inherits Int
  members : FamilyMember[]
}

// Extract person IDs from nested objects and load matching people
given { FamilyId = 1 }
find { Person[]( PersonId in Family::FamilyMember[].map( (FamilyMember) -> PersonId ) ) }

Combining with other conditions

IN and NOT IN operators can be combined with other MongoDB query operators:

// Find active users from specific countries
find { User[]( Country in ["US", "UK", "CA"] && Status == "ACTIVE" ) }
// Find users either VIP or from specific regions
find { User[]( UserType == "VIP" || Country in ["US", "UK"] ) }

Writing data to a collection

To expose a database collection for writes, you need to provide a write operation in a service.

Here’s a complete example schema with corresponding write operations:

type FlightCode inherits String
type DepartureTime inherits Instant
type DepartureAirport inherits String
type ArrivalAirport inherits String
type MongoObjectId inherits String

type AirlineCode inherits String
type AirlineName inherits String
type StarAllianceMember inherits Boolean

model Airline {
   code: AirlineCode
   name: AirlineName
   starAlliance: StarAllianceMember
}

@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfo {
   code: FlightCode
   depTime : DepartureTime
   arrival: ArrivalAirport
   airline: Airline
}

@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfoWithObjectId {
   @Id
   objectId: MongoObjectId?
   code: FlightCode
   departure: DepartureAirport
   arrival: ArrivalAirport
   airline: Airline
}

@MongoService( connection = "flightsMongo" )
service FlightsDb {
   table FlightInfo : FlightInfo[]
   table mongoFlights: FlightInfoWithObjectId[]

   // This is effectively Insert as the FlightInfo does not have @Id annotation.
   @UpsertOperation
   write operation insertFlight(FlightInfo):FlightInfo

   // If objectId field is populated, this will update the matching item in the collection.
   // Otherwise it will insert that provided  FlightInfoWithObjectId instance into the collection.
   @UpsertOperation
   write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
}

Sample mutating queries

Inserting data

This example shows inserting data into a Mongo collection.

Note that the objectId is null, allowing Mongo to assign an Id.

given { movie : FlightInfoWithObjectId = { 
    objectId : null , 
    code : "TK 1989", 
    departure: "IST", 
    arrival: "LHR", 
    airline: { code: "TK", name: "Turkish Airlines", starAlliance: true} 
  } 
}
call FlightsDb::upsertFlightWithObjectId

Updating data

given { movie : FlightInfoWithObjectId = { 
    objectId : "7df78ad8902ce46d" , 
    code : "TK 1990", 
    departure: "IST", 
    arrival: "LHR", 
    airline: { code: "TK", name: "Turkish Airlines", starAlliance: true} 
  } 
}
call FlightsDb::upsertFlightWithObjectId

Batching Upsert Operations

Rather than upserting one record at a time, you can batch upsert operations, just update your @UpsertOperation to specify batch settings:

@UpsertOperation(batchSize = 100, batchDuration = 10000)
write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
  • batchDuration is specified in milliseconds
  • batchSize specifies the number of records to batch before perfomring a write

When UpsertOperation annotation has these attributes defined, Orbital buffers upserts and flushes the buffer to Mongo each time the buffer reaches batchSize or batchDuration elapses.

Streaming data from Kafka into Mongo

This example shows streaming stock price updates from a Kafka topic directly into Mongo, updating based off the symbol

import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

// Kafka model and service emitting prices:
model StockPrice {
  symbol: StockSymbol inherits String
  currentPrice : StockPrice inherits Decimal
}

@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
  stream prices : Stream<StockPrice>
}


// Mongo model and service for saving prices:
@Collection(connection = "stockPricesMongoDb", collection = "stockPrices")
closed parameter model SavedStockPrice {
   @Id
   symbol : StockSymbol
   currentPrice : StockPrice
   timestamp : Instant = now()
}

@MongoService( connection = "stockPricesMongoDb" )
service StockPricesMongoService {
   table prices: SavedStockPrice[]

   @UpsertOperation
   write operation updatePrice(SavedStockPrice):SavedStockPrice
}

Given the above, the following query will save updated Kafka ticks into Mongo:

stream { StockPrice }
call StockPricesMongoService::updatePrice

Building a REST API that reads from Mongo

This is a full example, where we create an HTTP endpoint accepting a GET request with a ticker symbol.

We’ll use the same model and services declared in Streaming data from Kafka to Mongo, to avoid redeclaring them here.

@HttpOperation(url = "/api/q/stockPrices/{symbol}", method = "GET")
query FetchStockPrices(@PathVariable("symbol") symbol:StockSymbol) {
  find { SavedStockPrice( StockSymbol == symbol) } 
}
Previous
Databases
Next
Kafka