Managing data sources

Working with Mongo

Orbital can both read data from a Mongo collection to fetch data as part of a query, and write data back to Mongo.

Defining a Mongo connection

Mongo connections are stored in your connections.conf config file, under the mongo element.

The connection specifies how to connect to a Mongo Database.

mongo {
   usersMongo {
      connectionName=usersMongo
      connectionParameters {
        dbName = "sample_mflix"
        connectionString = "mongodb+srv://orbital:PASSWORD@orbital.xxxx.mongodb.net/?retryWrites=true&w=majority&appName=Orbital"
      }
   }
}

The following configuration options are mandatory under the connectionParameters

Config optionPurpose
dbNameName of the Mongo Database.
connectionStringA Valid Mongo Connection String (see Mongo Db Connection Strings for details.)

Defining a Collection mapping

Collections are exposed to Orbital using the annotation @com.orbitalhq.mongo.Collection on a model.

Fields names in the model are expected to align with field names from the collection.

Here’s an example:

import com.orbitalhq.mongo.Collection

type FirstName inherits String
type Password inherits String
type Email inherits String

@Collection(connection = "usersMongo", collection = "users")
model User {
      name : FirstName
      password : Password
      email: Email
}

The @Collection annotation contains the following parameters:

ParameterDescription
connectionThe name of a connection, as defined in your connections configuration file.
collectionThe name of the collection

It’s possible to use environment variables in these annotations, as described here.

Defining a mongo service

// 1: Add the required imports
import com.orbitalhq.mongo.MongoService

// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
   // 3: expose a table operation for a collection model.
   table user : User[]
}                             

Mapping the ObjectId

Use an @Id annotation to define the column that represents the Mongo ObjectId.

Here’s an example:

import com.orbitalhq.mongo.Collection

type FirstName inherits String
type Password inherits String
type Email inherits String
type MongoObjectId inherits String

@Collection(connection = "usersMongo", collection = "users")
model UserWithObjectId {
     @Id
     _id: MongoObjectId
     name : FirstName
     password : Password
     email: Email
}

In MongoDB, every document must have an _id field that serves as the primary key.

If you don’t provide one, MongoDB will automatically generate an ObjectId for this field.

While you can use other fields as identifiers in your application, the _id field will always be present and serves as the document’s unique identifier within the collection.

Orbital will let you assign an alternative name for the _id field in your model, but this isn’t recommended, as it can lead to confusion when examining the actual collection, or building native pipelines.

You can assign a different name to the column in your model by simply changing the name of the field:

@Collection(connection = "usersMongo", collection = "users")
model User {
     @Id
     userId: MongoObjectId
     name : FirstName
     password : Password
     email: Email
}

When querying data from User, results will show the _id column named userId.

However, it’s recommended that you use projections in a TaxiQL query to rename fields, rather than introducing mapping in your model.

Avoid renaming the id field

Renaming the _id field in your model can lead to confusion, as it's the only field that doesn't align with what the column is actually named in your Mongo collection. We do not reccomend doing this.

Alternatively omit the _id field from your Taxi model, and use a @UniqueIndex if you have one defined. Read more about Unique indexes in mongo

Mapping a Mongo Unique Index

Use the @UniqueIndex annotation to map a Mongo unique index to your Taxi model.

Here is an example:

import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex

@Collection(connection = "accountsMongo", collection = "accounts")
 model Account {
   @UniqueIndex
   accountId: AccountId inherits String
   accountName: AccountName inherits String
}

The above example annotates the accountId field with @UniqueIndex, which indicates that the underlying mongo collection accounts has a Mongo unique index defined on it.

Orbital doesn't create the index

Marking a field with a @UniqueIndex annotation does not create the index on the database - it's expected the index already exists.

Orbital considers the field annotated with @UniqueIndex when executing upsert operations against the Mongo collection. Consider the following service definition for the above model:

@MongoService( connection = "accountsMongo" )
service AccountsDb {
   table accounts : Account[]
   @UpsertOperation
   write operation upsertAccount(Account):Account   
}

When we issue the following query:

query.taxi
 given { account : Account = { accountId : "1" , accountName: "My USD Account" } }
 call AccountsDb::upsertAccount

Orbital will insert an account with accountId = "1" into the mongo accounts collection. When the following query is issued subsequently:

query.taxi
 given { account : Account = { accountId : "1" , accountName: "My GBP Account" } }
 call AccountsDb::upsertAccount

Orbital will use the @UniqueIndex and update the existing account data with the new accountName value rather than attempting to insert a new account into the Mongo collection.

Id vs UniqueIndex

The choice between @Id and @UniqueIndex depends on the design of your Mongo document.

Mongo always creates a field called _id, which is a primary key. You can store any string-based value in this field, provided it is unique:

@Collection(connection = "accountsMongo", collection = "accounts")
 model Account {
   @Id
   _id: AccountId inherits String
   accountName: AccountName inherits String
}

If you’d rather use a field named something other than _id for your primary key, then use @UniqueIndex:

import com.orbitalhq.mongo.UniqueIndex

@Collection(connection = "accountsMongo", collection = "accounts")
 model Account {
   @UniqueIndex
   accountId: AccountId inherits String
   accountName: AccountName inherits String
}

Other considerations:

  • If a model declares a @UniqueIndex, this field is used for matching on upsert operations
  • Do not combine both @Id and @UniqueIndex

Mongo setOnInsert Support

To leverage $setOnInsert for @Upsert operations, add the @SetOnInsert annotation on the field that needs to be set only when inserting a new document into the corresponding Mongo Collection.

Here is an example:

import com.orbitalhq.mongo.Collection
import com.orbitalhq.mongo.UniqueIndex
import com.orbitalhq.mongo.SetOnInsert

 @Collection(connection = "accountsMongo", collection = "accounts")
 model Account {
   @UniqueIndex
   accountId : AccountId inherits String
   currency : Currency inherits String
   @SetOnInsert
   insertedAt: InsertedAt inherits Instant = now()
   updatedAt: UpdateAt inherits Instant = now()
}

This model indicates that:

  • insertedAt should be set when a new Account is inserted into the accounts collection
  • subsequent updates to the same account document should not change the initial insertedAt value.

Querying Collections

To expose a Mongo database as a source for queries, the database must have a service and table operation exposed for a collection.

// 1: Add the required imports
import com.orbitalhq.mongo.MongoService

// 2: Annotate the service as a `MongoService`.
@MongoService( connectionName = "usersMongo" )
service MongoUsersService {
   // 3: expose a table operation for a collection model.
   table user : User[]
   table mongoUsers: UserWithObjectId[]
}                             

The @MongoService annotation contains the following parameters:

ParameterDescription
connectionThe name of a connection, as defined in yourconnections configuration file.

Sample queries

Fetch everything from a collection

find { User[] }

Fetch values by criteria

find { User[]( FirstName == "Harry" ) }
find { User[]( FirstName == "Harry" || FirstName == "Joe" ) }

Using IN and NOT IN operators

MongoDB collections support in and not in operators for efficient filtering against multiple values. These operators work with any field type and generate optimized MongoDB queries.

Filtering with IN operator

To find documents where a field matches any value in a list:

// Find users with specific IDs
find { User[]( UserId in [1, 2, 3] ) }
// Find users from specific countries
find { User[]( Country in ["US", "UK", "CA"] ) }

Filtering with NOT IN operator

To find documents where a field does not match any value in a list:

// Find users excluding specific IDs
find { User[]( UserId not in [1, 2] ) }
// Find users not from specific countries
find { User[]( Country not in ["US", "UK"] ) }

Loading documents based on data from other services

A powerful pattern is to use in operators with data fetched from other services. This enables efficient bulk lookups:

// Given these models:

model Family {
  id : FamilyId inherits Int
  members : PersonId[]  // Array of person IDs
}


service FamilyApi {
  operation getFamily(FamilyId):Family
}

@Collection(connection = "usersMongo", collection = "people")
model Person {
  @Id
  id : PersonId inherits Int
  name : Name inherits String
}

@MongoService( connection = "usersMongo" )
service UsersDb {
  table people : Person[]
}

You can fetch family data from an API, then load all family members from MongoDB in a single query:

// Load all people belonging to family ID 1
given { FamilyId = 1 }
find { Person[]( PersonId in Family::PersonId[] ) }

This pattern works with any data transformation. For example, if family members are nested objects:

model FamilyMember {
  id : PersonId
}

model Family {
  id : FamilyId inherits Int
  members : FamilyMember[]
}

// Extract person IDs from nested objects and load matching people
given { FamilyId = 1 }
find { Person[]( PersonId in Family::FamilyMember[].map( (FamilyMember) -> PersonId ) ) }

Combining with other conditions

IN and NOT IN operators can be combined with other MongoDB query operators:

// Find active users from specific countries
find { User[]( Country in ["US", "UK", "CA"] && Status == "ACTIVE" ) }
// Find users either VIP or from specific regions
find { User[]( UserType == "VIP" || Country in ["US", "UK"] ) }

Writing data to a collection

To expose a database collection for writes, you need to provide a write operation in a service.

Here’s a complete example schema with corresponding write operations:

type FlightCode inherits String
type DepartureTime inherits Instant
type DepartureAirport inherits String
type ArrivalAirport inherits String
type MongoObjectId inherits String

type AirlineCode inherits String
type AirlineName inherits String
type StarAllianceMember inherits Boolean

model Airline {
   code: AirlineCode
   name: AirlineName
   starAlliance: StarAllianceMember
}

@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfo {
   code: FlightCode
   depTime : DepartureTime
   arrival: ArrivalAirport
   airline: Airline
}

@Collection(connection = "flightsMongo", collection = "flightInfo")
model FlightInfoWithObjectId {
   @Id
   objectId: MongoObjectId?
   code: FlightCode
   departure: DepartureAirport
   arrival: ArrivalAirport
   airline: Airline
}

@MongoService( connection = "flightsMongo" )
service FlightsDb {
   table FlightInfo : FlightInfo[]
   table mongoFlights: FlightInfoWithObjectId[]

   // This is effectively Insert as the FlightInfo does not have @Id annotation.
   @UpsertOperation
   write operation insertFlight(FlightInfo):FlightInfo

   // If objectId field is populated, this will update the matching item in the collection.
   // Otherwise it will insert that provided  FlightInfoWithObjectId instance into the collection.
   @UpsertOperation
   write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
}

Batching Upsert Operations

Rather than upserting one record at a time, you can batch upsert operations, just update your @UpsertOperation to specify batch settings:

@UpsertOperation(batchSize = 100, batchDuration = 10000)
write operation upsertFlightWithObjectId(FlightInfoWithObjectId):FlightInfoWithObjectId
  • batchDuration is specified in milliseconds
  • batchSize specifies the number of records to batch before perfomring a write

When UpsertOperation annotation has these attributes defined, Orbital buffers upserts and flushes the buffer to Mongo each time the buffer reaches batchSize or batchDuration elapses.

Deleting data from a collection

Orbital supports two types of delete operations for MongoDB collections: simple deletes using @DeleteOperation and complex filtered deletes using @DeleteByQuery.

Simple Delete Operations

Use @DeleteOperation for straightforward deletes that target documents by their @Id or @UniqueIndex fields.

type DeleteCount inherits Int

model DeleteResult {
   deletedCount: DeleteCount
}

@MongoService(connection = "usersMongo")
service UserService {
   @DeleteOperation
   write operation deleteUser(User): DeleteResult
   
   @DeleteOperation
   write operation deleteUsers(User[]): DeleteResult
}

The @DeleteOperation annotation supports both single document and multiple document deletes:

  • Single delete: Pass a single model instance. Orbital will delete the document matching the @Id or @UniqueIndex field.
  • Multiple delete: Pass an array of model instances. Orbital will delete all documents that match the @Id or @UniqueIndex fields of the provided instances.

Requirements for Delete Operations:

  • Your model must have either an @Id or @UniqueIndex annotation
  • The return type should be DeleteResult to get information about how many documents were deleted

Complex Delete Operations with @DeleteByQuery

Use @DeleteByQuery for advanced delete operations that require complex filtering using MongoDB query syntax.

@MongoService(connection = "productsMongo")
service ProductService {
   @DeleteByQuery(
      collection = "products",
      filter = '{ categoryId: :categoryId, price: { $gte: :minPrice } }'
   )
   write operation deleteExpensiveInCategory(
      categoryId: CategoryId,
      minPrice: Price
   ): DeleteResult
   
   @DeleteByQuery(
      collection = "products", 
      filter = '{}'
   )
   write operation deleteAllProducts(): DeleteResult
}

The @DeleteByQuery annotation contains the following parameters:

ParameterDescription
collectionThe name of the MongoDB collection to delete from
filterMongoDB filter query using standard MongoDB query syntax. Use :parameterName for parameter substitution

Parameter Substitution: Parameters in the filter query are substituted using the :parameterName syntax. Orbital will replace these placeholders with the actual parameter values from your operation.

MongoDB Query Operators: You can use any valid MongoDB query operators in your filter:

@DeleteByQuery(
   collection = "products",
   filter = '''{ 
      $or: [
         { categoryId: :category1 },
         { price: { $lt: :maxPrice } }
      ]
   }'''
)
write operation deleteByComplexCriteria(
   category1: CategoryId,
   maxPrice: Price
): DeleteResult

Native MongoDB aggregations

For complex data transformations that go beyond Orbital’s standard table operations, you can use native MongoDB aggregation pipelines as an “escape hatch”. This allows you to leverage the full power of MongoDB’s aggregation framework while maintaining type safety and semantic integration with your Orbital queries.

Basic syntax

Use the @CollectionAggregation annotation with a pipeline object containing the collection name and an array of MongoDB aggregation stages:

import com.orbitalhq.mongo.CollectionAggregation
import com.orbitalhq.mongo.MongoService
import com.orbitalhq.mongo.Collection

@Collection(connection = "productsMongo", collection = "products")
model Product {
   @Id
   id: ProductId inherits String
   name: ProductName inherits String
   categoryId: CategoryId inherits String
   price: Price inherits Decimal
}

@MongoService(connection = "productsMongo")
service ProductService {
   @CollectionAggregation(
      pipeline = {
         collection: "products",
         stages: [
            '{ $match: { categoryId: :categoryId } }',
            '{ $sort: { price: -1 } }'
         ]
      }
   )
   operation getProductsByCategory(categoryId: CategoryId): Product[]
}

Multi-line aggregation stages

For complex aggregation stages, use triple-quoted strings (""") to span multiple lines for better readability:

@MongoService(connection = "productsMongo")
service ProductService {
   @CollectionAggregation(
      pipeline = {
         collection: "products",
         stages: [
            '{ $sort: { price: -1 } }',
            """{ $facet: {
               "results": [
                  { $skip: :offset },
                  { $limit: :pageSize }
               ],
               "metadata": [
                  { $count: "totalRecords" }
               ]
            } }""",
            """{ $addFields: {
               "meta": {
                  "totalRecords": { $arrayElemAt: ["$metadata.totalRecords", 0] },
                  "page": { $add: [{ $divide: [:offset, :pageSize] }, 1] },
                  "totalPages": { $ceil: { $divide: [{ $arrayElemAt: ["$metadata.totalRecords", 0] }, :pageSize] } }
               }
            } }"""
         ]
      }
   )
   operation getProductsWithPagination(
      offset: Int,
      pageSize: Int
   ): ProductPage
}

Parameter binding

Parameters are bound using the :parameterName syntax and are automatically serialized safely to prevent injection attacks. All Taxi semantic types are supported:

@CollectionAggregation(
   pipeline = {
     collection: "orders",
     stages: [
        '{ $match: { customerId: :customerId, total: { $gte: :minTotal }, orderDate: { $gte: :fromDate } } }',
        '{ $sort: { orderDate: -1 } }',
        '{ $limit: :maxResults }'
     ]
   }
)
operation getRecentHighValueOrders(
   customerId: CustomerId,
   minTotal: OrderTotal,
   fromDate: OrderDate,
   maxResults: Int
): Order[]

Common aggregation patterns

Grouping and summarization

@CollectionAggregation(
   pipeline = {
     collection: "orders",
     stages: [
        '{ $group: { _id: "$customerId", totalSpent: { $sum: "$total" }, orderCount: { $sum: 1 } } }',
        '{ $project: { customerId: "$_id", totalSpent: 1, orderCount: 1, _id: 0 } }'
     ]
   }
)
operation getCustomerSummaries(): CustomerSummary[]

Sorting with pagination

@CollectionAggregation(
   pipeline = {
     collection: "products", 
     stages: [
        '{ $match: { inStock: true } }',
        '{ $sort: { price: 1, name: 1 } }',
        '{ $skip: :offset }',
        '{ $limit: :pageSize }'
     ]
   }
)
operation getProductsPaged(offset: Int, pageSize: Int): Product[]

Advanced pagination with metadata

model ProductPage {
   results: Product[]
   meta: PaginationMeta
}

model PaginationMeta {
   totalRecords: Int
   page: Int
   totalPages: Int
}

@CollectionAggregation(
   pipeline = {
     collection: "products",
     stages: [
        '{ $sort: { price: -1 } }',
        """{ $facet: {
           "results": [
              { $skip: :offset },
              { $limit: :pageSize }
           ],
           "metadata": [
              { $count: "totalRecords" }
           ]
        } }""",
        """{ $addFields: {
           "meta": {
              "totalRecords": { $arrayElemAt: ["$metadata.totalRecords", 0] },
              "page": { $add: [{ $divide: [:offset, :pageSize] }, 1] },
              "totalPages": { $ceil: { $divide: [{ $arrayElemAt: ["$metadata.totalRecords", 0] }, :pageSize] } }
           }
        } }"""
     ]
   }
)
operation getProductsWithPagination(offset: Int, pageSize: Int): ProductPage

Collection joins with $lookup

@CollectionAggregation(
   pipeline = {
     collection: "orders",
     stages = [
        """{ $lookup: {
           from: "customers",
           localField: "customerId",
           foreignField: "_id", 
           as: "customer"
        } }""",
        '{ $unwind: "$customer" }',
        """{ $project: {
           orderId: "$_id",
           customerName: "$customer.name",
           total: 1
        } }"""
     ]
   }
)
operation getOrdersWithCustomerDetails(): OrderWithCustomer[]

When to use native aggregations

Choosing between table operations and aggregations

Use `@CollectionAggregation` for complex transformations that require MongoDB-specific features. For simple filtering and projection, prefer Orbital's standard table operations for better portability.

Use @CollectionAggregation when you need:

  • Complex data transformations beyond basic filtering and projection
  • Performance-optimized aggregations that push computation to the database
  • MongoDB-specific features like $facet, $lookup, or $unwind
  • Custom calculated fields or conditional logic
  • Advanced pagination with metadata
  • Grouping, summarization, or statistical calculations

For simple queries like basic filtering (find { User[](FirstName == "John") }), prefer Orbital’s standard table operations which provide better portability and integration with the broader query system.

Multi-Collection Transactions with @MultiAggregation

The @MultiAggregation annotation allows you to execute multiple aggregation pipelines across different collections - optionally within a single MongoDB transaction.

This is particularly useful for complex data transformations that need to maintain consistency across multiple collections.

Transaction limitations

There are limits around what Mongo can do within a transaction. See Transaction Considerations for more details.

Basic syntax

import com.orbitalhq.mongo.MultiAggregation
import com.orbitalhq.mongo.MongoService

@MongoService(connection = "usersMongo")
service UserService {
   @MultiAggregation(
      transactional = true,
      pipelines = [
         {
            collection: "user_events",
            stages: [
               '{ $match: { processed: false } }',
               '{ $group: { _id: "$userId", eventCount: { $sum: 1 } } }'
            ]
         },
         {
            collection: "user_stats",
            stages: [
               '{ $merge: { into: "user_stats", on: "_id", whenMatched: "merge" } }'
            ]
         }
      ]
   )
   write operation processUserEvents(): ProcessingResult[]
}

Parameters

The @MultiAggregation annotation accepts the following parameters:

ParameterDescriptionDefault
pipelinesArray of AggregationPipeline objects, each containing a collection name and array of aggregation stages-
transactionalBoolean indicating whether the pipelines should execute within a MongoDB transaction. Set to false for operations that don’t require transactional guarantees or when using features not supported in MongoDB transactionstrue

AggregationPipeline structure

Each pipeline in the pipelines array has the following structure:

model AggregationPipeline {
   collection: String    // Name of the MongoDB collection
   stages: String[]      // Array of aggregation stage strings
}

Transaction considerations

MongoDB Transaction Limitations

MongoDB has specific limitations on what operations can be performed within transactions.
Orbital does not validate that your pipeline complies with these restrictions. If you define a pipeline that violates MongoDB's transaction rules, an error will be thrown at runtime.
Consult the Mongo docs on transaction operations to understand what limitations exist.

When transactional = true, be aware that MongoDB transactions have limitations:

  • Cannot use certain aggregation operators like $merge, $out, or $lookup with collections from different databases
  • Cannot perform operations that would create new collections
  • All collections must be in the same replica set or sharded cluster

Set transactional = false when:

  • Using operations not supported in transactions (like $merge or $out)
  • Performance is more important than strict consistency
  • Working with operations that don’t require atomicity

Advanced example: Event stream processing with conditional merging

This example demonstrates a common pattern for processing event streams with conditional merging to avoid race conditions:

type UserId inherits String
type ProfileScore inherits Int
type IsEnriched inherits Boolean
type IsMerged inherits Boolean

@Collection(connection = "usersMongo", collection = "users_primary")
model User {
   @Id
   id: UserId
   name: String
   profileScore: ProfileScore?
   isEnriched: IsEnriched = false
}

@Collection(connection = "usersMongo", collection = "user_enrichments")
model UserEnrichment {
   @Id
   userId: UserId
   profileScore: ProfileScore
   isMerged: IsMerged = false
}

@MongoService(connection = "usersMongo")
service UserService {
   @MultiAggregation(
      transactional = false,  // Using $merge which isn't supported in transactions
      pipelines = [
         {
            collection: "user_enrichments",
            stages: [
               '{ $match: { isMerged: false } }',
               """{ $lookup: {
                  from: "users_primary",
                  localField: "_id",
                  foreignField: "_id",
                  as: "user"
               } }""",
               '{ $match: { user: { $ne: [] } } }',
               '{ $unwind: "$user" }',
               """{ $replaceRoot: {
                  newRoot: {
                     $mergeObjects: [
                        "$user",
                        { profileScore: "$profileScore", isEnriched: true }
                     ]
                  }
               } }""",
               """{ $merge: {
                  into: "users_primary",
                  on: "_id",
                  whenMatched: "merge",
                  whenNotMatched: "discard"
               } }"""
            ]
         },
         {
            collection: "user_enrichments", 
            stages: [
               '{ $match: { isMerged: false } }',
               """{ $lookup: {
                  from: "users_primary",
                  localField: "_id", 
                  foreignField: "_id",
                  as: "user"
               } }""",
               '{ $match: { user: { $ne: [] } } }',
               '{ $set: { isMerged: true } }',
               """{ $merge: {
                  into: "user_enrichments",
                  on: "_id",
                  whenMatched: "merge",
                  whenNotMatched: "discard"
               } }"""
            ]
         }
      ]
   )
   write operation mergePendingUserUpdates(): MergeResult[]
}

Parameter binding in transactions

Parameters work the same way in @MultiAggregation as in @CollectionAggregation, using the :parameterName syntax:

@MultiAggregation(
   pipelines = [
      {
         collection: "orders",
         stages: [
            '{ $match: { customerId: :customerId, status: "pending" } }',
            '{ $set: { status: "processing", updatedAt: new Date() } }'
         ]
      },
      {
         collection: "order_audit",
         stages: [
            '{ $addFields: { customerId: :customerId, action: "status_update" } }',
            '{ $merge: { into: "order_audit" } }'
         ]
      }
   ]
)
operation processOrdersForCustomer(customerId: CustomerId): ProcessingResult[]

When to use @MultiAggregation

Use @MultiAggregation when you need to:

  • Execute multiple related aggregation operations atomically
  • Perform complex data transformations across multiple collections
  • Implement event stream processing patterns with conditional merging
  • Maintain data consistency across related collections
  • Perform bulk operations that span multiple collections

Use @CollectionAggregation for single-collection operations, and prefer standard table operations for simple queries.

Sample mutating queries

Inserting data

This example shows inserting data into a Mongo collection.

Note that the objectId is null, allowing Mongo to assign an Id.

given { movie : FlightInfoWithObjectId = { 
    objectId : null , 
    code : "TK 1989", 
    departure: "IST", 
    arrival: "LHR", 
    airline: { code: "TK", name: "Turkish Airlines", starAlliance: true} 
  } 
}
call FlightsDb::upsertFlightWithObjectId

Updating data

given { movie : FlightInfoWithObjectId = { 
    objectId : "7df78ad8902ce46d" , 
    code : "TK 1990", 
    departure: "IST", 
    arrival: "LHR", 
    airline: { code: "TK", name: "Turkish Airlines", starAlliance: true} 
  } 
}
call FlightsDb::upsertFlightWithObjectId

Deleting a single document

given { user : User = { id: "user123", name: "John", email: "john@example.com" } }
call UserService::deleteUser

Deleting multiple documents

given { 
   users : User[] = [
      { id: "user123" },
      { id: "user456" }
   ]
}
call UserService::deleteUsers

Deleting with complex filtering

given {
   categoryId: CategoryId = "electronics",
   minPrice: Price = 500.00
}
call ProductService::deleteExpensiveInCategory

Streaming data from Kafka into Mongo

This example shows streaming stock price updates from a Kafka topic directly into Mongo, updating based off the symbol

import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

// Kafka model and service emitting prices:
model StockPrice {
  symbol: StockSymbol inherits String
  currentPrice : StockPrice inherits Decimal
}

@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
  stream prices : Stream<StockPrice>
}


// Mongo model and service for saving prices:
@Collection(connection = "stockPricesMongoDb", collection = "stockPrices")
closed parameter model SavedStockPrice {
   @Id
   symbol : StockSymbol
   currentPrice : StockPrice
   timestamp : Instant = now()
}

@MongoService( connection = "stockPricesMongoDb" )
service StockPricesMongoService {
   table prices: SavedStockPrice[]

   @UpsertOperation
   write operation updatePrice(SavedStockPrice):SavedStockPrice
}

Given the above, the following query will save updated Kafka ticks into Mongo:

stream { StockPrice }
call StockPricesMongoService::updatePrice

Building a REST API that reads from Mongo

This is a full example, where we create an HTTP endpoint accepting a GET request with a ticker symbol.

We’ll use the same model and services declared in Streaming data from Kafka to Mongo, to avoid redeclaring them here.

@HttpOperation(url = "/api/q/stockPrices/{symbol}", method = "GET")
query FetchStockPrices(@PathVariable("symbol") symbol:StockSymbol) {
  find { SavedStockPrice( StockSymbol == symbol) } 
}

Deleting data from collections

Orbital supports two approaches for deleting data from MongoDB collections: simple delete operations based on model structure, and native MongoDB delete operations for advanced filtering.

Simple delete operations

Use the @DeleteOperation annotation for straightforward delete operations based on model IDs:

import com.orbitalhq.mongo.DeleteOperation

@MongoService(connection = "productsMongo")
service ProductService {
   // Delete single document by ID
   @DeleteOperation
   write operation deleteOneProduct(Product): DeleteResult
   
   // Delete multiple documents by ID array
   @DeleteOperation  
   write operation deleteManyProducts(Product[]): DeleteResult
}

The operation signature determines the behavior:

  • Single model parameterdeleteOne() operation using @Id or @UniqueIndex field
  • Array parameterdeleteMany() operation using $in query on ID fields

Both operations return a DeleteResult model containing the count of deleted documents.

Native MongoDB delete operations

For complex delete operations that require advanced filtering, use the @DeleteByQuery annotation with native MongoDB filter syntax:

import com.orbitalhq.mongo.DeleteByQuery

@MongoService(connection = "productsMongo") 
service ProductService {
   @DeleteByQuery(
      collection = "products",
      filter = '{ categoryId: :categoryId, price: { $gte: :minPrice } }'
   )
   write operation deleteExpensiveInCategory(
      categoryId: CategoryId,
      minPrice: Price
   ): DeleteResult
}

Parameter binding in delete filters

Parameters use the :parameterName syntax and are automatically serialized safely to prevent injection attacks:

@DeleteByQuery(
   collection = "products",
   filter = '''{ 
      $or: [
         { categoryId: :category1 },
         { price: { $lt: :maxPrice } }
      ] 
   }'''
)
write operation deleteByComplexCriteria(
   category1: CategoryId,
   maxPrice: Price  
): DeleteResult

Multi-line delete filters

Use triple-quoted strings for complex filter expressions:

@DeleteByQuery(
   collection = "orders",
   filter = """{ 
      $and: [
         { customerId: :customerId },
         { status: { $in: ["cancelled", "failed"] } },
         { createdDate: { $lt: :cutoffDate } }
      ]
   }"""
)
write operation cleanupFailedOrders(
   customerId: CustomerId,
   cutoffDate: Date
): DeleteResult

Delete operation examples

Delete all documents (empty filter):

@DeleteByQuery(
   collection = "temp_data",
   filter = '{}'
)
write operation clearTempData(): DeleteResult

Simple ID-based deletion:

given { Product = { id: "laptop-1" } }
call ProductService::deleteOneProduct

Bulk deletion by IDs:

given { 
   Product[] = [
      { id: "laptop-1" },
      { id: "phone-1" }
   ] 
}
call ProductService::deleteManyProducts

Conditional deletion with multiple criteria:

given {
   categoryId: CategoryId = "electronics"
   minPrice: Price = 500.00
}
call ProductService::deleteExpensiveInCategory

When to use each approach

Use @DeleteOperation when:

  • Deleting specific documents by their ID or unique index
  • Working with arrays of models to delete multiple specific documents
  • You want type-safe, rename-proof operations tied to your model structure

Use @DeleteByQuery when:

  • Complex filtering criteria involving multiple fields
  • Using MongoDB query operators ($gte, $or, $in, etc.)
  • Conditional deletion based on calculated values or ranges
  • Bulk deletion operations that can’t be expressed as simple ID lookups
Previous
Databases
Next
Kafka