Managing data sources

Azure Service Bus as a data source

Orbital can both read messages from Azure Service Bus topics/queues to stream data as part of a query, and write messages back to Service Bus topics/queues.

Defining a connection to Azure Service Bus

Service Bus connections are stored in your connections.conf config file, under the serviceBus element.

The connection specifies how to connect to a Service Bus namespace - but not the specifics of individual topics or queues. (Those are specified on the Service Bus operation annotations in your Taxi model.)

serviceBus {
   warehouseConnection {
      connectionName=warehouseConnection
      connectionParameters {
         endPoint="mynamespace.servicebus.windows.net/"
         sharedAccessKeyName="RootManageSharedAccessKey"
         sharedAccessKey="${SERVICE_BUS_ACCESS_KEY}"
      }
   }
   
   anotherConnection {
      connectionName=anotherConnection
      connectionParameters {
         endPoint="another-namespace.servicebus.windows.net/"
         sharedAccessKeyName="RootManageSharedAccessKey"
         sharedAccessKey="${ANOTHER_SERVICE_BUS_KEY}"
      }
   }
}

The following configuration options are supported under the connectionParameters:

Config optionPurpose
endPointThe Service Bus namespace endpoint URL
sharedAccessKeyNameThe Shared Access Signature (SAS) key name
sharedAccessKeyThe Shared Access Signature (SAS) key value
It's recommended to use environment variables for sensitive values like the shared access key.

Streaming from Azure Service Bus

You can stream messages from both Service Bus queues and topics using Orbital.

Exposing a queue or topic subscription

Service Bus queues and topics are declared in Taxi as operations which return a Stream of data.

// 1: Add the required imports
import com.orbitalhq.azure.servicebus.ServiceBusService
import com.orbitalhq.azure.servicebus.ServiceBusQueueOperation
import com.orbitalhq.azure.servicebus.ServiceBusTopicSubscriptionOperation

// Define your message model
model Person {
   name : Name inherits String
}

// 2: Annotate the service as a `ServiceBusService`
@ServiceBusService(connectionName = "warehouseConnection")
service PersonServiceBusService {
   // 3a: Stream from a queue
   @ServiceBusQueueOperation(queue = "people-queue")
   stream streamFromQueue : Stream<Person>
   
   // 3b: Stream from a topic subscription
   @ServiceBusTopicSubscriptionOperation(topic = "people-topic", subscription = "all-people")
   stream streamFromTopic : Stream<Person>
}
  1. Add the required imports
  2. Annotate the service with @ServiceBusService
  1. Annotate the operations as either:
  • @ServiceBusQueueOperation for queues, specifying the queue name
  • @ServiceBusTopicSubscriptionOperation for topics, specifying both the topic name and subscription name
  1. The return type should be a Stream<> of the defined message type.

Example queries

Basic streaming from Service Bus

// Stream messages from a Service Bus queue
stream { Person }

This will consume messages from the people-queue queue declared in the PersonServiceBusService service.

Enrich data from Service Bus with other sources

Data from Service Bus can be projected to enrich it with data from other sources. Data requested that is not present in the Service Bus message payload is looked up from other sources, using Orbital’s standard projections.

stream { Person } as {
  name : Name  // available in the Service Bus message
  address : Address // looked up from another data source
}[]

Filtering Service Bus streams

This example reads all messages from the Service Bus, but only emits those with a specific name pattern:

stream { Person.filterEach( ( Person ) -> Person::Name.startsWith('J') ) }

Streaming from Service Bus to a database

Streams from Service Bus can be inserted into a database (or any other writable source) using a mutating query:

// First, ensure that your data destination exposes a writeable data source
service DatabaseService {
   @UpsertOperation
   write operation savePerson(Person):Person
}

// Then, define a streaming query that pipes messages to the database
stream { Person }
call DatabaseService::savePerson

Publishing to Azure Service Bus

To make a queue or topic writable, declare a write operation in a Service Bus service:

// Add the required imports
import com.orbitalhq.azure.servicebus.ServiceBusService
import com.orbitalhq.azure.servicebus.ServiceBusQueueOperation
import com.orbitalhq.azure.servicebus.ServiceBusTopicPublicationOperation

// Define your message model
model Person {
   name : Name inherits String
}

@ServiceBusService(connectionName = "warehouseConnection")
service PersonServiceBusService {
   // Publish to a queue
   @ServiceBusQueueOperation(queue = "people-queue")
   write operation publishToQueue(Person):Person
   
   // Publish to a topic
   @ServiceBusTopicPublicationOperation(topic = "people-topic")
   write operation publishToTopic(Person):Person
}

Batch publishing

Service Bus supports publishing messages in batches to improve performance. To enable batching, add the batchSize and batchDuration parameters to the operation annotation:

@ServiceBusService(connectionName = "warehouseConnection")
service PersonServiceBusService {
   // Publish to a topic with batching
   @ServiceBusTopicPublicationOperation(
      topic = "people-topic", 
      batchSize = 5,      // Number of messages in a batch
      batchDuration = 10000  // Maximum time to wait for batch in milliseconds
   )
   write operation publishToTopic(Person):Person
}

When batching is configured:

  1. Messages are accumulated until either:
  • The batch size is reached
  • The batch duration time elapses
  1. The accumulated messages are then sent as a single batch to Service Bus

Example use cases

Publishing a static message to Service Bus

given { person : Person = 
  {
    name : 'John Doe'
  } 
}
call PersonServiceBusService::publishToTopic

Consuming from one Service Bus entity and publishing to another

To stream data from a Service Bus queue/topic, transform it, and republish:

@ServiceBusService(connectionName = "warehouseConnection")
service TransformationService {
   @ServiceBusQueueOperation(queue = "raw-people")
   stream rawPeople : Stream<Person>

   @ServiceBusTopicPublicationOperation(topic = "enriched-people")
   write operation publishEnrichedPerson(EnrichedPerson):EnrichedPerson 
} 

The following query will consume from the raw-people queue, and for each message, transform it to an EnrichedPerson, invoking any other services required to inject the necessary data:

stream { Person }
// Each incoming Person message is transformed to an EnrichedPerson payload
// and published to the enriched-people topic
call TransformationService::publishEnrichedPerson

Building a REST API that publishes to Service Bus

This example creates an HTTP endpoint accepting a POST request to publish a message to Service Bus:

type PersonName inherits String

// The inbound request sent over HTTP
model PersonRequest {
  name : PersonName
}

// The message we'll publish to Service Bus
parameter model Person {
  name : PersonName
}

@ServiceBusService(connectionName = "warehouseConnection")
service PersonService {
   @ServiceBusTopicPublicationOperation(topic = "people-topic")
   write operation publishPerson(Person):Person
}

@HttpOperation(path = "/api/q/publishPerson", method = "POST")
query PublishPersonQuery(@RequestBody request:PersonRequest) {
  given { request }
  call PersonService::publishPerson
}

The above example works as follows:

  • A POST request is sent to /api/q/publishPerson with a body containing a name
  • The query calls publishPerson, which constructs a Person from the request
  • The message is published to the Service Bus topic people-topic

Advanced features

Message properties and metadata

Service Bus provides several methods to add metadata to messages. Orbital supports accessing and setting these properties through annotations:

import com.orbitalhq.azure.servicebus.ServiceBusMessageProperty

model Person {
   name : Name inherits String
   
   @ServiceBusMessageProperty(name = "MessageId")
   messageId : String
   
   @ServiceBusMessageProperty(name = "CorrelationId")
   correlationId : String
}

When consuming messages, these properties will be populated from the incoming message. When publishing messages, these properties will be set on the outgoing message.

Error handling and dead-letter queues

Service Bus includes built-in dead-letter queue functionality. When errors occur during message processing, you can configure Orbital to handle these scenarios:

@ServiceBusService(connectionName = "warehouseConnection")
service PersonService {
   @ServiceBusTopicSubscriptionOperation(
      topic = "people-topic", 
      subscription = "all-people",
      maxDeliveryCount = 3  // Number of processing attempts before message is dead-lettered
   )
   stream streamPeople : Stream<Person>
}
Previous
AWS services
Next
Writing queries