Managing data sources
Describing Mongo Database
Orbital can both read data from a Mongo collection to fetch data as part of a query, and write data back to Mongo.
Defining a connection to your Mongo Database
Mongo connections are stored in your connections.conf
config file, under the mongo
element.
The connection specifies how to connect to a Mongo Database.
mongo {
usersMongo {
connectionName=usersMongo
connectionParameters {
dbName = "sample_mflix"
connectionString = "mongodb+srv://orbital:PASSWORD@orbital.xxxx.mongodb.net/?retryWrites=true&w=majority&appName=Orbital"
}
}
}
The following configuration options are mandatory under the connectionParameters
Config option | Purpose |
---|---|
dbName | Name of the Mongo Database. |
connectionString | A Valid Mongo Connection String (see Mongo Db Connection Strings for details.) |
Defining a Collection mapping
Collections are exposed to Orbital using the annotation @com.orbitalhq.mongo.Collection
on a model.
Fields names in the model are expected to align with field names from the collection.
Here’s an example:
import com.orbitalhq.mongo.Collection
type FirstName inherits String
type Password inherits String
type Email inherits String
@Collection(connection = "usersMongo", collection = "users")
model User {
name : FirstName
password : Password
email: Email
}
The @Collection
annotation contains the following parameters:
Parameter | Description |
---|---|
connection | The name of a connection, as defined in your connections configuration file. |
collection | The name of the collection |
Mapping the ObjectId
Use an @Id
annotation to define the column that represents the Mongo ObjectId.
Here’s an example:
import com.orbitalhq.mongo.Collection
type FirstName inherits String
type Password inherits String
type Email inherits String
type MongoObjectId inherits String
@Collection(connection = "usersMongo", collection = "users")
model UserWithObjectId {
@Id
objectId: MongoObjectId
name : FirstName
password : Password
email: Email
}
Mapping a Mongo Unique Index
Use @UniqueIndex
annotation to map a Mongo Unique index to your taxi model
Here is an example:
import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex
@Collection(connection = "accountsMongo", collection = "accounts")
model Account {
@UniqueIndex
accountId: AccountId inherits String
accountName: AccountName inherits String
}
Above example annotates accountId
field with @UniqueIndex
which implies that the underlying mongo collection accounts
has a Mongo Unique index defined on it.
Orbital consider the field annotated with @UniqueIndex
when exeucting upsert
operations against the Mongo Collection. Consider the following service
definition for the above model:
@MongoService( connection = "accountsMongo" )
service AccountsDb {
table accounts : Account[]
@UpsertOperation
write operation upsertAccount(Account):Account
}
When we issue the following query:
given { account : Account = { accountId : "1" , accountName: "My USD Account" } }
call AccountsDb::upsertAccount
Orbital will insert an account with accountId = "1"
into the mongo accounts
collection. When the following query is issued subsequently:
given { account : Account = { accountId : "1" , accountName: "My GPB Account" } }
call AccountsDb::upsertAccount
Orbital will make use of @UniqueIndex
and update the existing account
data with the new accountName
value rather than attempting to insert a new account into the Mongo Collection.
Mongo $setOnInsert Support:
If you need to leverage $setOnInsert for your @Upsert
operations, you can use @SetOnInsert
annotation on the fields
that needs to be set only when inserting a new document into the corresponding Mongo Collection.
Here is an example:
import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex
import com.orbitalhq.mongo.SetOnInsert
@Collection(connection = "accountsMongo", collection = "accounts")
model Account {
@UniqueIndex
accountId : AccountId inherits String
currency : Currency inherits String
@SetOnInsert
insertedAt: InsertedAt inherits Instant = now()
updatedAt: UpdateAt inherits Instant = now()
}
Above taxi model expresses the fact that insertedAt
field should be set when a new Account
is inserted into the accounts
collection and subsequent updates to the same account
document should not change the initial insertedAt
value.
Querying Collections
To expose a Mongo database as a source for queries, the database must have a service and table operation exposed for a collection.
// 1: Add the required imports
import com.orbitalhq.mongo.MongoService
// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
// 3: expose a table operation for a collection model.
table user : User[]
table mongoUsers: UserWithObjectId[]
}
The @MongoService
annotation contains the following parameters:
Parameter | Description |
---|---|
connection | The name of a connection, as defined in yourconnections configuration file. |
Sample queries
Fetch everything from a collection:
find { User[] }
Fetch values by criteria:
find { User[]( FirstName == "Harry" ) }
find { User[]( FirstName == "Harry" || FirstName == "Joe" ) }
Writing data to a collection
To expose a database collection for writes, you need to provide a write operation
in a service.
Here’s a complete example schema with corresponding write
operations:
type FlightCode inherits String
type DepartureTime inherits Instant
type DepartureAirport inherits String
type ArrivalAirport inherits String
type MongoObjectId inherits String
type AirlineCode inherits String
type AirlineName inherits String
type StarAllianceMember inherits Boolean
model Airline {
code: AirlineCode
name: AirlineName
starAlliance: StarAllianceMember
}
@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfo {
code: FlightCode
depTime : DepartureTime
arrival: ArrivalAirport
airline: Airline
}
@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfoWithObjectId {
@Id
objectId: MongoObjectId?
code: FlightCode
departure: DepartureAirport
arrival: ArrivalAirport
airline: Airline
}
@MongoService( connection = "flightsMongo" )
service FlightsDb {
table FlightInfo : FlightInfo[]
table mongoFlights: FlightInfoWithObjectId[]
// This is effectively Insert as the FlightInfo does not have @Id annotation.
@UpsertOperation
write operation insertFlight(FlightInfo):FlightInfo
// If objectId field is populated, this will update the matching item in the collection.
// Otherwise it will insert that provided FlightInfoWithObjectId instance into the collection.
@UpsertOperation
write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
}
Sample mutating queries
Inserting data
This example shows inserting data into a Mongo collection.
Note that the objectId
is null
, allowing Mongo to assign an Id.
given { movie : FlightInfoWithObjectId = {
objectId : null ,
code : "TK 1989",
departure: "IST",
arrival: "LHR",
airline: { code: "TK", name: "Turkish Airlines", starAlliance: true}
}
}
call FlightsDb::upsertFlightWithObjectId
Updating data
given { movie : FlightInfoWithObjectId = {
objectId : "7df78ad8902ce46d" ,
code : "TK 1990",
departure: "IST",
arrival: "LHR",
airline: { code: "TK", name: "Turkish Airlines", starAlliance: true}
}
}
call FlightsDb::upsertFlightWithObjectId
Batching Upsert Operations
Rather than upserting one record at a time, you can batch upsert operations, just update your @UpsertOperation
to specify batch settings:
// If objectId field is populated, this will update the matching item in the collection.
// Otherwise it will insert that provided FlightInfoWithObjectId instance into the collection.
@UpsertOperation(batchSize = 1, batchDuration = 10000)
write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
batchDuration
is specified in milliseconds and batchSize
specifies the buffer size. When UpsertOperation
annotation has these attributes defined, Orbital buffers upserts and flushes the buffer to Mongo each time the buffer reaches batchSize
or batchDuration
elapses.
Streaming data from Kafka into Mongo
This example shows streaming stock price updates from a Kafka topic directly into Mongo, updating based off the symbol
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation
// Kafka model and service emitting prices:
model StockPrice {
symbol: StockSymbol inherits String
currentPrice : StockPrice inherits Decimal
}
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
stream prices : Stream<StockPrice>
}
// Mongo model and service for saving prices:
@Collection(connection = "stockPricesMongoDb", collection = "stockPrices")
closed parameter model SavedStockPrice {
@Id
symbol : StockSymbol
currentPrice : StockPrice
timestamp : Instant = now()
}
@MongoService( connection = "stockPricesMongoDb" )
service StockPricesMongoService {
table prices: SavedStockPrice[]
@UpsertOperation
write operation updatePrice(SavedStockPrice):SavedStockPrice
}
Given the above, the following query will save updated Kafka ticks into Mongo:
stream { StockPrice }
call StockPricesMongoService::updatePrice
Building a REST API that reads from Mongo
This is a full example, where we create an HTTP endpoint accepting a GET
request
with a ticker symbol.
We’ll use the same model and services declared in Streaming data from Kafka to Mongo, to avoid redeclaring them here.
@HttpOperation(url = "/api/q/stockPrices/{symbol}", method = "GET")
query FetchStockPrices(@PathVariable("symbol") symbol:StockSymbol) {
find { SavedStockPrice( StockSymbol == symbol) }
}