Managing data sources
Working with Mongo
Orbital can both read data from a Mongo collection to fetch data as part of a query, and write data back to Mongo.
Defining a Mongo connection
Mongo connections are stored in your connections.conf
config file, under the mongo
element.
The connection specifies how to connect to a Mongo Database.
mongo {
usersMongo {
connectionName=usersMongo
connectionParameters {
dbName = "sample_mflix"
connectionString = "mongodb+srv://orbital:PASSWORD@orbital.xxxx.mongodb.net/?retryWrites=true&w=majority&appName=Orbital"
}
}
}
The following configuration options are mandatory under the connectionParameters
Config option | Purpose |
---|---|
dbName | Name of the Mongo Database. |
connectionString | A Valid Mongo Connection String (see Mongo Db Connection Strings for details.) |
Defining a Collection mapping
Collections are exposed to Orbital using the annotation @com.orbitalhq.mongo.Collection
on a model.
Fields names in the model are expected to align with field names from the collection.
Here’s an example:
import com.orbitalhq.mongo.Collection
type FirstName inherits String
type Password inherits String
type Email inherits String
@Collection(connection = "usersMongo", collection = "users")
model User {
name : FirstName
password : Password
email: Email
}
The @Collection
annotation contains the following parameters:
Parameter | Description |
---|---|
connection | The name of a connection, as defined in your connections configuration file. |
collection | The name of the collection |
It’s possible to use environment variables in these annotations, as described here.
Defining a mongo service
// 1: Add the required imports
import com.orbitalhq.mongo.MongoService
// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
// 3: expose a table operation for a collection model.
table user : User[]
}
Mapping the ObjectId
Use an @Id
annotation to define the column that represents the Mongo ObjectId.
Here’s an example:
import com.orbitalhq.mongo.Collection
type FirstName inherits String
type Password inherits String
type Email inherits String
type MongoObjectId inherits String
@Collection(connection = "usersMongo", collection = "users")
model UserWithObjectId {
@Id
_id: MongoObjectId
name : FirstName
password : Password
email: Email
}
In MongoDB, every document must have an _id
field that serves as the primary key.
If you don’t provide one, MongoDB will automatically generate an ObjectId for this field.
While you can use other fields as identifiers in your application, the _id
field will always be present and serves as the document’s unique identifier within the collection.
Orbital will let you assign an alternative name for the _id
field in your model, but this isn’t recommended, as it can lead to confusion when examining the actual collection,
or building native pipelines.
You can assign a different name to the column in your model by simply changing the name of the field:
@Collection(connection = "usersMongo", collection = "users")
model User {
@Id
userId: MongoObjectId
name : FirstName
password : Password
email: Email
}
When querying data from User
, results will show the _id
column named userId
.
However, it’s recommended that you use projections in a TaxiQL query to rename fields, rather than introducing mapping in your model.
Avoid renaming the id field
Alternatively omit the _id
field from your Taxi model, and use a @UniqueIndex
if you have one defined. Read more about Unique indexes in mongo
Mapping a Mongo Unique Index
Use the @UniqueIndex
annotation to map a Mongo unique index to your Taxi model.
Here is an example:
import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex
@Collection(connection = "accountsMongo", collection = "accounts")
model Account {
@UniqueIndex
accountId: AccountId inherits String
accountName: AccountName inherits String
}
The above example annotates the accountId
field with @UniqueIndex
, which indicates that the underlying mongo collection accounts
has a Mongo unique index defined on it.
Orbital doesn't create the index
@UniqueIndex
annotation does not create the index on the database - it's expected the index already exists.Orbital considers the field annotated with @UniqueIndex
when executing upsert
operations against the Mongo collection. Consider the following service
definition for the above model:
@MongoService( connection = "accountsMongo" )
service AccountsDb {
table accounts : Account[]
@UpsertOperation
write operation upsertAccount(Account):Account
}
When we issue the following query:
given { account : Account = { accountId : "1" , accountName: "My USD Account" } }
call AccountsDb::upsertAccount
Orbital will insert an account with accountId = "1"
into the mongo accounts
collection. When the following query is issued subsequently:
given { account : Account = { accountId : "1" , accountName: "My GBP Account" } }
call AccountsDb::upsertAccount
Orbital will use the @UniqueIndex
and update the existing account
data with the new accountName
value rather than attempting to insert a new account into the Mongo collection.
Id vs UniqueIndex
The choice between @Id
and @UniqueIndex
depends on the design of your Mongo document.
Mongo always creates a field called _id
, which is a primary key. You can store any string-based value in this field,
provided it is unique:
@Collection(connection = "accountsMongo", collection = "accounts")
model Account {
@Id
_id: AccountId inherits String
accountName: AccountName inherits String
}
If you’d rather use a field named something other than _id
for your primary key, then use @UniqueIndex
:
import com.orbitalhq.mongo.UniqueIndex
@Collection(connection = "accountsMongo", collection = "accounts")
model Account {
@UniqueIndex
accountId: AccountId inherits String
accountName: AccountName inherits String
}
Other considerations:
- If a model declares a
@UniqueIndex
, this field is used for matching on upsert operations - Do not combine both
@Id
and@UniqueIndex
Mongo setOnInsert Support
To leverage $setOnInsert for @Upsert
operations, add the @SetOnInsert
annotation on the field
that needs to be set only when inserting a new document into the corresponding Mongo Collection.
Here is an example:
import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex
import com.orbitalhq.mongo.SetOnInsert
@Collection(connection = "accountsMongo", collection = "accounts")
model Account {
@UniqueIndex
accountId : AccountId inherits String
currency : Currency inherits String
@SetOnInsert
insertedAt: InsertedAt inherits Instant = now()
updatedAt: UpdateAt inherits Instant = now()
}
This model indicates that:
insertedAt
should be set when a newAccount
is inserted into theaccounts
collection- subsequent updates to the same
account
document should not change the initialinsertedAt
value.
Querying Collections
To expose a Mongo database as a source for queries, the database must have a service and table operation exposed for a collection.
// 1: Add the required imports
import com.orbitalhq.mongo.MongoService
// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
// 3: expose a table operation for a collection model.
table user : User[]
table mongoUsers: UserWithObjectId[]
}
The @MongoService
annotation contains the following parameters:
Parameter | Description |
---|---|
connection | The name of a connection, as defined in yourconnections configuration file. |
Sample queries
Fetch everything from a collection
find { User[] }
Fetch values by criteria
find { User[]( FirstName == "Harry" ) }
find { User[]( FirstName == "Harry" || FirstName == "Joe" ) }
Using IN and NOT IN operators
MongoDB collections support in and not in operators for efficient filtering against multiple values. These operators work with any field type and generate optimized MongoDB queries.
Filtering with IN
operator
To find documents where a field matches any value in a list:
// Find users with specific IDs
find { User[]( UserId in [1, 2, 3] ) }
// Find users from specific countries
find { User[]( Country in ["US", "UK", "CA"] ) }
Filtering with NOT IN
operator
To find documents where a field does not match any value in a list:
// Find users excluding specific IDs
find { User[]( UserId not in [1, 2] ) }
// Find users not from specific countries
find { User[]( Country not in ["US", "UK"] ) }
Loading documents based on data from other services
A powerful pattern is to use in operators with data fetched from other services. This enables efficient bulk lookups:
// Given these models:
model Family {
id : FamilyId inherits Int
members : PersonId[] // Array of person IDs
}
service FamilyApi {
operation getFamily(FamilyId):Family
}
@Collection(connection = "usersMongo", collection = "people")
model Person {
@Id
id : PersonId inherits Int
name : Name inherits String
}
@MongoService( connection = "usersMongo" )
service UsersDb {
table people : Person[]
}
You can fetch family data from an API, then load all family members from MongoDB in a single query:
// Load all people belonging to family ID 1
given { FamilyId = 1 }
find { Person[]( PersonId in Family::PersonId[] ) }
This pattern works with any data transformation. For example, if family members are nested objects:
model FamilyMember {
id : PersonId
}
model Family {
id : FamilyId inherits Int
members : FamilyMember[]
}
// Extract person IDs from nested objects and load matching people
given { FamilyId = 1 }
find { Person[]( PersonId in Family::FamilyMember[].map( (FamilyMember) -> PersonId ) ) }
Combining with other conditions
IN
and NOT IN
operators can be combined with other MongoDB query operators:
// Find active users from specific countries
find { User[]( Country in ["US", "UK", "CA"] && Status == "ACTIVE" ) }
// Find users either VIP or from specific regions
find { User[]( UserType == "VIP" || Country in ["US", "UK"] ) }
Writing data to a collection
To expose a database collection for writes, you need to provide a write operation
in a service.
Here’s a complete example schema with corresponding write
operations:
type FlightCode inherits String
type DepartureTime inherits Instant
type DepartureAirport inherits String
type ArrivalAirport inherits String
type MongoObjectId inherits String
type AirlineCode inherits String
type AirlineName inherits String
type StarAllianceMember inherits Boolean
model Airline {
code: AirlineCode
name: AirlineName
starAlliance: StarAllianceMember
}
@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfo {
code: FlightCode
depTime : DepartureTime
arrival: ArrivalAirport
airline: Airline
}
@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfoWithObjectId {
@Id
objectId: MongoObjectId?
code: FlightCode
departure: DepartureAirport
arrival: ArrivalAirport
airline: Airline
}
@MongoService( connection = "flightsMongo" )
service FlightsDb {
table FlightInfo : FlightInfo[]
table mongoFlights: FlightInfoWithObjectId[]
// This is effectively Insert as the FlightInfo does not have @Id annotation.
@UpsertOperation
write operation insertFlight(FlightInfo):FlightInfo
// If objectId field is populated, this will update the matching item in the collection.
// Otherwise it will insert that provided FlightInfoWithObjectId instance into the collection.
@UpsertOperation
write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
}
Batching Upsert Operations
Rather than upserting one record at a time, you can batch upsert operations, just update your @UpsertOperation
to specify batch settings:
@UpsertOperation(batchSize = 100, batchDuration = 10000)
write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
batchDuration
is specified in millisecondsbatchSize
specifies the number of records to batch before perfomring a write
When UpsertOperation
annotation has these attributes defined, Orbital buffers upserts and flushes the buffer to Mongo each time the buffer reaches batchSize
or batchDuration
elapses.
Deleting data from a collection
Orbital supports two types of delete operations for MongoDB collections: simple deletes using @DeleteOperation
and complex filtered deletes using @DeleteByQuery
.
Simple Delete Operations
Use @DeleteOperation
for straightforward deletes that target documents by their @Id
or @UniqueIndex
fields.
type DeleteCount inherits Int
model DeleteResult {
deletedCount: DeleteCount
}
@MongoService(connection = "usersMongo")
service UserService {
@DeleteOperation
write operation deleteUser(User): DeleteResult
@DeleteOperation
write operation deleteUsers(User[]): DeleteResult
}
The @DeleteOperation
annotation supports both single document and multiple document deletes:
- Single delete: Pass a single model instance. Orbital will delete the document matching the
@Id
or@UniqueIndex
field. - Multiple delete: Pass an array of model instances. Orbital will delete all documents that match the
@Id
or@UniqueIndex
fields of the provided instances.
Requirements for Delete Operations:
- Your model must have either an
@Id
or@UniqueIndex
annotation - The return type should be
DeleteResult
to get information about how many documents were deleted
Complex Delete Operations with @DeleteByQuery
Use @DeleteByQuery
for advanced delete operations that require complex filtering using MongoDB query syntax.
@MongoService(connection = "productsMongo")
service ProductService {
@DeleteByQuery(
collection = "products",
filter = '{ categoryId: :categoryId, price: { $gte: :minPrice } }'
)
write operation deleteExpensiveInCategory(
categoryId: CategoryId,
minPrice: Price
): DeleteResult
@DeleteByQuery(
collection = "products",
filter = '{}'
)
write operation deleteAllProducts(): DeleteResult
}
The @DeleteByQuery
annotation contains the following parameters:
Parameter | Description |
---|---|
collection | The name of the MongoDB collection to delete from |
filter | MongoDB filter query using standard MongoDB query syntax. Use :parameterName for parameter substitution |
Parameter Substitution:
Parameters in the filter query are substituted using the :parameterName
syntax. Orbital will replace these placeholders with the actual parameter values from your operation.
MongoDB Query Operators: You can use any valid MongoDB query operators in your filter:
@DeleteByQuery(
collection = "products",
filter = '''{
$or: [
{ categoryId: :category1 },
{ price: { $lt: :maxPrice } }
]
}'''
)
write operation deleteByComplexCriteria(
category1: CategoryId,
maxPrice: Price
): DeleteResult
Native MongoDB aggregations
For complex data transformations that go beyond Orbital’s standard table operations, you can use native MongoDB aggregation pipelines as an “escape hatch”. This allows you to leverage the full power of MongoDB’s aggregation framework while maintaining type safety and semantic integration with your Orbital queries.
Basic syntax
Use the @CollectionAggregation
annotation with a pipeline object containing the collection name and an array of MongoDB aggregation stages:
import com.orbitalhq.mongo.CollectionAggregation
import com.orbitalhq.mongo.MongoService
import com.orbitalhq.mongo.Collection
@Collection(connection = "productsMongo", collection = "products")
model Product {
@Id
id: ProductId inherits String
name: ProductName inherits String
categoryId: CategoryId inherits String
price: Price inherits Decimal
}
@MongoService(connection = "productsMongo")
service ProductService {
@CollectionAggregation(
pipeline = {
collection: "products",
stages: [
'{ $match: { categoryId: :categoryId } }',
'{ $sort: { price: -1 } }'
]
}
)
operation getProductsByCategory(categoryId: CategoryId): Product[]
}
Multi-line aggregation stages
For complex aggregation stages, use triple-quoted strings ("""
) to span multiple lines for better readability:
@MongoService(connection = "productsMongo")
service ProductService {
@CollectionAggregation(
pipeline = {
collection: "products",
stages: [
'{ $sort: { price: -1 } }',
"""{ $facet: {
"results": [
{ $skip: :offset },
{ $limit: :pageSize }
],
"metadata": [
{ $count: "totalRecords" }
]
} }""",
"""{ $addFields: {
"meta": {
"totalRecords": { $arrayElemAt: ["$metadata.totalRecords", 0] },
"page": { $add: [{ $divide: [:offset, :pageSize] }, 1] },
"totalPages": { $ceil: { $divide: [{ $arrayElemAt: ["$metadata.totalRecords", 0] }, :pageSize] } }
}
} }"""
]
}
)
operation getProductsWithPagination(
offset: Int,
pageSize: Int
): ProductPage
}
Parameter binding
Parameters are bound using the :parameterName
syntax and are automatically serialized safely to prevent injection attacks. All Taxi semantic types are supported:
@CollectionAggregation(
pipeline = {
collection: "orders",
stages: [
'{ $match: { customerId: :customerId, total: { $gte: :minTotal }, orderDate: { $gte: :fromDate } } }',
'{ $sort: { orderDate: -1 } }',
'{ $limit: :maxResults }'
]
}
)
operation getRecentHighValueOrders(
customerId: CustomerId,
minTotal: OrderTotal,
fromDate: OrderDate,
maxResults: Int
): Order[]
Common aggregation patterns
Grouping and summarization
@CollectionAggregation(
pipeline = {
collection: "orders",
stages: [
'{ $group: { _id: "$customerId", totalSpent: { $sum: "$total" }, orderCount: { $sum: 1 } } }',
'{ $project: { customerId: "$_id", totalSpent: 1, orderCount: 1, _id: 0 } }'
]
}
)
operation getCustomerSummaries(): CustomerSummary[]
Sorting with pagination
@CollectionAggregation(
pipeline = {
collection: "products",
stages: [
'{ $match: { inStock: true } }',
'{ $sort: { price: 1, name: 1 } }',
'{ $skip: :offset }',
'{ $limit: :pageSize }'
]
}
)
operation getProductsPaged(offset: Int, pageSize: Int): Product[]
Advanced pagination with metadata
model ProductPage {
results: Product[]
meta: PaginationMeta
}
model PaginationMeta {
totalRecords: Int
page: Int
totalPages: Int
}
@CollectionAggregation(
pipeline = {
collection: "products",
stages: [
'{ $sort: { price: -1 } }',
"""{ $facet: {
"results": [
{ $skip: :offset },
{ $limit: :pageSize }
],
"metadata": [
{ $count: "totalRecords" }
]
} }""",
"""{ $addFields: {
"meta": {
"totalRecords": { $arrayElemAt: ["$metadata.totalRecords", 0] },
"page": { $add: [{ $divide: [:offset, :pageSize] }, 1] },
"totalPages": { $ceil: { $divide: [{ $arrayElemAt: ["$metadata.totalRecords", 0] }, :pageSize] } }
}
} }"""
]
}
)
operation getProductsWithPagination(offset: Int, pageSize: Int): ProductPage
Collection joins with $lookup
@CollectionAggregation(
pipeline = {
collection: "orders",
stages = [
"""{ $lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customer"
} }""",
'{ $unwind: "$customer" }',
"""{ $project: {
orderId: "$_id",
customerName: "$customer.name",
total: 1
} }"""
]
}
)
operation getOrdersWithCustomerDetails(): OrderWithCustomer[]
When to use native aggregations
Choosing between table operations and aggregations
Use @CollectionAggregation
when you need:
- Complex data transformations beyond basic filtering and projection
- Performance-optimized aggregations that push computation to the database
- MongoDB-specific features like
$facet
,$lookup
, or$unwind
- Custom calculated fields or conditional logic
- Advanced pagination with metadata
- Grouping, summarization, or statistical calculations
For simple queries like basic filtering (find { User[](FirstName == "John") }
), prefer Orbital’s standard table operations which provide better portability and integration with the broader query system.
Multi-Collection Transactions with @MultiAggregation
The @MultiAggregation
annotation allows you to execute multiple aggregation pipelines across different collections - optionally within a single MongoDB transaction.
This is particularly useful for complex data transformations that need to maintain consistency across multiple collections.
Transaction limitations
Basic syntax
import com.orbitalhq.mongo.MultiAggregation
import com.orbitalhq.mongo.MongoService
@MongoService(connection = "usersMongo")
service UserService {
@MultiAggregation(
transactional = true,
pipelines = [
{
collection: "user_events",
stages: [
'{ $match: { processed: false } }',
'{ $group: { _id: "$userId", eventCount: { $sum: 1 } } }'
]
},
{
collection: "user_stats",
stages: [
'{ $merge: { into: "user_stats", on: "_id", whenMatched: "merge" } }'
]
}
]
)
write operation processUserEvents(): ProcessingResult[]
}
Parameters
The @MultiAggregation
annotation accepts the following parameters:
Parameter | Description | Default |
---|---|---|
pipelines | Array of AggregationPipeline objects, each containing a collection name and array of aggregation stages | - |
transactional | Boolean indicating whether the pipelines should execute within a MongoDB transaction. Set to false for operations that don’t require transactional guarantees or when using features not supported in MongoDB transactions | true |
AggregationPipeline structure
Each pipeline in the pipelines
array has the following structure:
model AggregationPipeline {
collection: String // Name of the MongoDB collection
stages: String[] // Array of aggregation stage strings
}
Transaction considerations
MongoDB Transaction Limitations
Orbital does not validate that your pipeline complies with these restrictions. If you define a pipeline that violates MongoDB's transaction rules, an error will be thrown at runtime.
Consult the Mongo docs on transaction operations to understand what limitations exist.
When transactional = true
, be aware that MongoDB transactions have limitations:
- Cannot use certain aggregation operators like
$merge
,$out
, or$lookup
with collections from different databases - Cannot perform operations that would create new collections
- All collections must be in the same replica set or sharded cluster
Set transactional = false
when:
- Using operations not supported in transactions (like
$merge
or$out
) - Performance is more important than strict consistency
- Working with operations that don’t require atomicity
Advanced example: Event stream processing with conditional merging
This example demonstrates a common pattern for processing event streams with conditional merging to avoid race conditions:
type UserId inherits String
type ProfileScore inherits Int
type IsEnriched inherits Boolean
type IsMerged inherits Boolean
@Collection(connection = "usersMongo", collection = "users_primary")
model User {
@Id
id: UserId
name: String
profileScore: ProfileScore?
isEnriched: IsEnriched = false
}
@Collection(connection = "usersMongo", collection = "user_enrichments")
model UserEnrichment {
@Id
userId: UserId
profileScore: ProfileScore
isMerged: IsMerged = false
}
@MongoService(connection = "usersMongo")
service UserService {
@MultiAggregation(
transactional = false, // Using $merge which isn't supported in transactions
pipelines = [
{
collection: "user_enrichments",
stages: [
'{ $match: { isMerged: false } }',
"""{ $lookup: {
from: "users_primary",
localField: "_id",
foreignField: "_id",
as: "user"
} }""",
'{ $match: { user: { $ne: [] } } }',
'{ $unwind: "$user" }',
"""{ $replaceRoot: {
newRoot: {
$mergeObjects: [
"$user",
{ profileScore: "$profileScore", isEnriched: true }
]
}
} }""",
"""{ $merge: {
into: "users_primary",
on: "_id",
whenMatched: "merge",
whenNotMatched: "discard"
} }"""
]
},
{
collection: "user_enrichments",
stages: [
'{ $match: { isMerged: false } }',
"""{ $lookup: {
from: "users_primary",
localField: "_id",
foreignField: "_id",
as: "user"
} }""",
'{ $match: { user: { $ne: [] } } }',
'{ $set: { isMerged: true } }',
"""{ $merge: {
into: "user_enrichments",
on: "_id",
whenMatched: "merge",
whenNotMatched: "discard"
} }"""
]
}
]
)
write operation mergePendingUserUpdates(): MergeResult[]
}
Parameter binding in transactions
Parameters work the same way in @MultiAggregation
as in @CollectionAggregation
, using the :parameterName
syntax:
@MultiAggregation(
pipelines = [
{
collection: "orders",
stages: [
'{ $match: { customerId: :customerId, status: "pending" } }',
'{ $set: { status: "processing", updatedAt: new Date() } }'
]
},
{
collection: "order_audit",
stages: [
'{ $addFields: { customerId: :customerId, action: "status_update" } }',
'{ $merge: { into: "order_audit" } }'
]
}
]
)
operation processOrdersForCustomer(customerId: CustomerId): ProcessingResult[]
When to use @MultiAggregation
Use @MultiAggregation
when you need to:
- Execute multiple related aggregation operations atomically
- Perform complex data transformations across multiple collections
- Implement event stream processing patterns with conditional merging
- Maintain data consistency across related collections
- Perform bulk operations that span multiple collections
Use @CollectionAggregation
for single-collection operations, and prefer standard table operations for simple queries.
Sample mutating queries
Inserting data
This example shows inserting data into a Mongo collection.
Note that the objectId
is null
, allowing Mongo to assign an Id.
given { movie : FlightInfoWithObjectId = {
objectId : null ,
code : "TK 1989",
departure: "IST",
arrival: "LHR",
airline: { code: "TK", name: "Turkish Airlines", starAlliance: true}
}
}
call FlightsDb::upsertFlightWithObjectId
Updating data
given { movie : FlightInfoWithObjectId = {
objectId : "7df78ad8902ce46d" ,
code : "TK 1990",
departure: "IST",
arrival: "LHR",
airline: { code: "TK", name: "Turkish Airlines", starAlliance: true}
}
}
call FlightsDb::upsertFlightWithObjectId
Deleting a single document
given { user : User = { id: "user123", name: "John", email: "john@example.com" } }
call UserService::deleteUser
Deleting multiple documents
given {
users : User[] = [
{ id: "user123" },
{ id: "user456" }
]
}
call UserService::deleteUsers
Deleting with complex filtering
given {
categoryId: CategoryId = "electronics",
minPrice: Price = 500.00
}
call ProductService::deleteExpensiveInCategory
Streaming data from Kafka into Mongo
This example shows streaming stock price updates from a Kafka topic directly into Mongo, updating based off the symbol
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation
// Kafka model and service emitting prices:
model StockPrice {
symbol: StockSymbol inherits String
currentPrice : StockPrice inherits Decimal
}
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
stream prices : Stream<StockPrice>
}
// Mongo model and service for saving prices:
@Collection(connection = "stockPricesMongoDb", collection = "stockPrices")
closed parameter model SavedStockPrice {
@Id
symbol : StockSymbol
currentPrice : StockPrice
timestamp : Instant = now()
}
@MongoService( connection = "stockPricesMongoDb" )
service StockPricesMongoService {
table prices: SavedStockPrice[]
@UpsertOperation
write operation updatePrice(SavedStockPrice):SavedStockPrice
}
Given the above, the following query will save updated Kafka ticks into Mongo:
stream { StockPrice }
call StockPricesMongoService::updatePrice
Building a REST API that reads from Mongo
This is a full example, where we create an HTTP endpoint accepting a GET
request
with a ticker symbol.
We’ll use the same model and services declared in Streaming data from Kafka to Mongo, to avoid redeclaring them here.
@HttpOperation(url = "/api/q/stockPrices/{symbol}", method = "GET")
query FetchStockPrices(@PathVariable("symbol") symbol:StockSymbol) {
find { SavedStockPrice( StockSymbol == symbol) }
}
Deleting data from collections
Orbital supports two approaches for deleting data from MongoDB collections: simple delete operations based on model structure, and native MongoDB delete operations for advanced filtering.
Simple delete operations
Use the @DeleteOperation
annotation for straightforward delete operations based on model IDs:
import com.orbitalhq.mongo.DeleteOperation
@MongoService(connection = "productsMongo")
service ProductService {
// Delete single document by ID
@DeleteOperation
write operation deleteOneProduct(Product): DeleteResult
// Delete multiple documents by ID array
@DeleteOperation
write operation deleteManyProducts(Product[]): DeleteResult
}
The operation signature determines the behavior:
- Single model parameter →
deleteOne()
operation using@Id
or@UniqueIndex
field - Array parameter →
deleteMany()
operation using$in
query on ID fields
Both operations return a DeleteResult
model containing the count of deleted documents.
Native MongoDB delete operations
For complex delete operations that require advanced filtering, use the @DeleteByQuery
annotation with native MongoDB filter syntax:
import com.orbitalhq.mongo.DeleteByQuery
@MongoService(connection = "productsMongo")
service ProductService {
@DeleteByQuery(
collection = "products",
filter = '{ categoryId: :categoryId, price: { $gte: :minPrice } }'
)
write operation deleteExpensiveInCategory(
categoryId: CategoryId,
minPrice: Price
): DeleteResult
}
Parameter binding in delete filters
Parameters use the :parameterName
syntax and are automatically serialized safely to prevent injection attacks:
@DeleteByQuery(
collection = "products",
filter = '''{
$or: [
{ categoryId: :category1 },
{ price: { $lt: :maxPrice } }
]
}'''
)
write operation deleteByComplexCriteria(
category1: CategoryId,
maxPrice: Price
): DeleteResult
Multi-line delete filters
Use triple-quoted strings for complex filter expressions:
@DeleteByQuery(
collection = "orders",
filter = """{
$and: [
{ customerId: :customerId },
{ status: { $in: ["cancelled", "failed"] } },
{ createdDate: { $lt: :cutoffDate } }
]
}"""
)
write operation cleanupFailedOrders(
customerId: CustomerId,
cutoffDate: Date
): DeleteResult
Delete operation examples
Delete all documents (empty filter):
@DeleteByQuery(
collection = "temp_data",
filter = '{}'
)
write operation clearTempData(): DeleteResult
Simple ID-based deletion:
given { Product = { id: "laptop-1" } }
call ProductService::deleteOneProduct
Bulk deletion by IDs:
given {
Product[] = [
{ id: "laptop-1" },
{ id: "phone-1" }
]
}
call ProductService::deleteManyProducts
Conditional deletion with multiple criteria:
given {
categoryId: CategoryId = "electronics"
minPrice: Price = 500.00
}
call ProductService::deleteExpensiveInCategory
When to use each approach
Use @DeleteOperation
when:
- Deleting specific documents by their ID or unique index
- Working with arrays of models to delete multiple specific documents
- You want type-safe, rename-proof operations tied to your model structure
Use @DeleteByQuery
when:
- Complex filtering criteria involving multiple fields
- Using MongoDB query operators (
$gte
,$or
,$in
, etc.) - Conditional deletion based on calculated values or ranges
- Bulk deletion operations that can’t be expressed as simple ID lookups