Managing data sources
Azure Service Bus as a data source
Orbital can both read messages from Azure Service Bus topics/queues to stream data as part of a query, and write messages back to Service Bus topics/queues.
Defining a connection to Azure Service Bus
Service Bus connections are stored in your connections.conf
config file, under the serviceBus
element.
The connection specifies how to connect to a Service Bus namespace - but not the specifics of individual topics or queues. (Those are specified on the Service Bus operation annotations in your Taxi model.)
serviceBus {
warehouseConnection {
connectionName=warehouseConnection
connectionParameters {
endPoint="mynamespace.servicebus.windows.net/"
sharedAccessKeyName="RootManageSharedAccessKey"
sharedAccessKey="${SERVICE_BUS_ACCESS_KEY}"
}
}
anotherConnection {
connectionName=anotherConnection
connectionParameters {
endPoint="another-namespace.servicebus.windows.net/"
sharedAccessKeyName="RootManageSharedAccessKey"
sharedAccessKey="${ANOTHER_SERVICE_BUS_KEY}"
}
}
}
The following configuration options are supported under the connectionParameters
:
Config option | Purpose |
---|---|
endPoint | The Service Bus namespace endpoint URL |
sharedAccessKeyName | The Shared Access Signature (SAS) key name |
sharedAccessKey | The Shared Access Signature (SAS) key value |
Streaming from Azure Service Bus
You can stream messages from both Service Bus queues and topics using Orbital.
Exposing a queue or topic subscription
Service Bus queues and topics are declared in Taxi as operations which return a Stream
of data.
// 1: Add the required imports
import com.orbitalhq.azure.servicebus.ServiceBusService
import com.orbitalhq.azure.servicebus.ServiceBusQueueOperation
import com.orbitalhq.azure.servicebus.ServiceBusTopicSubscriptionOperation
// Define your message model
model Person {
name : Name inherits String
}
// 2: Annotate the service as a `ServiceBusService`
@ServiceBusService(connectionName = "warehouseConnection")
service PersonServiceBusService {
// 3a: Stream from a queue
@ServiceBusQueueOperation(queue = "people-queue")
stream streamFromQueue : Stream<Person>
// 3b: Stream from a topic subscription
@ServiceBusTopicSubscriptionOperation(topic = "people-topic", subscription = "all-people")
stream streamFromTopic : Stream<Person>
}
- Add the required imports
- Annotate the service with
@ServiceBusService
- The
connectionName
parameter should match a connection defined in the connections config file.
- Annotate the operations as either:
@ServiceBusQueueOperation
for queues, specifying the queue name@ServiceBusTopicSubscriptionOperation
for topics, specifying both the topic name and subscription name
- The return type should be a
Stream<>
of the defined message type.
Example queries
Basic streaming from Service Bus
// Stream messages from a Service Bus queue
stream { Person }
This will consume messages from the people-queue
queue declared in the PersonServiceBusService
service.
Enrich data from Service Bus with other sources
Data from Service Bus can be projected to enrich it with data from other sources. Data requested that is not present in the Service Bus message payload is looked up from other sources, using Orbital’s standard projections.
stream { Person } as {
name : Name // available in the Service Bus message
address : Address // looked up from another data source
}[]
Filtering Service Bus streams
This example reads all messages from the Service Bus, but only emits those with a specific name pattern:
stream { Person.filterEach( ( Person ) -> Person::Name.startsWith('J') ) }
Streaming from Service Bus to a database
Streams from Service Bus can be inserted into a database (or any other writable source) using a mutating query:
// First, ensure that your data destination exposes a writeable data source
service DatabaseService {
@UpsertOperation
write operation savePerson(Person):Person
}
// Then, define a streaming query that pipes messages to the database
stream { Person }
call DatabaseService::savePerson
Publishing to Azure Service Bus
To make a queue or topic writable, declare a write operation
in a Service Bus service:
// Add the required imports
import com.orbitalhq.azure.servicebus.ServiceBusService
import com.orbitalhq.azure.servicebus.ServiceBusQueueOperation
import com.orbitalhq.azure.servicebus.ServiceBusTopicPublicationOperation
// Define your message model
model Person {
name : Name inherits String
}
@ServiceBusService(connectionName = "warehouseConnection")
service PersonServiceBusService {
// Publish to a queue
@ServiceBusQueueOperation(queue = "people-queue")
write operation publishToQueue(Person):Person
// Publish to a topic
@ServiceBusTopicPublicationOperation(topic = "people-topic")
write operation publishToTopic(Person):Person
}
Batch publishing
Service Bus supports publishing messages in batches to improve performance. To enable batching, add the batchSize
and batchDuration
parameters to the operation annotation:
@ServiceBusService(connectionName = "warehouseConnection")
service PersonServiceBusService {
// Publish to a topic with batching
@ServiceBusTopicPublicationOperation(
topic = "people-topic",
batchSize = 5, // Number of messages in a batch
batchDuration = 10000 // Maximum time to wait for batch in milliseconds
)
write operation publishToTopic(Person):Person
}
When batching is configured:
- Messages are accumulated until either:
- The batch size is reached
- The batch duration time elapses
- The accumulated messages are then sent as a single batch to Service Bus
Example use cases
Publishing a static message to Service Bus
given { person : Person =
{
name : 'John Doe'
}
}
call PersonServiceBusService::publishToTopic
Consuming from one Service Bus entity and publishing to another
To stream data from a Service Bus queue/topic, transform it, and republish:
@ServiceBusService(connectionName = "warehouseConnection")
service TransformationService {
@ServiceBusQueueOperation(queue = "raw-people")
stream rawPeople : Stream<Person>
@ServiceBusTopicPublicationOperation(topic = "enriched-people")
write operation publishEnrichedPerson(EnrichedPerson):EnrichedPerson
}
The following query will consume from the raw-people
queue, and for each message, transform it to an EnrichedPerson
, invoking any other services required to inject the necessary data:
stream { Person }
// Each incoming Person message is transformed to an EnrichedPerson payload
// and published to the enriched-people topic
call TransformationService::publishEnrichedPerson
Building a REST API that publishes to Service Bus
This example creates an HTTP endpoint accepting a POST
request to publish a message to Service Bus:
type PersonName inherits String
// The inbound request sent over HTTP
model PersonRequest {
name : PersonName
}
// The message we'll publish to Service Bus
parameter model Person {
name : PersonName
}
@ServiceBusService(connectionName = "warehouseConnection")
service PersonService {
@ServiceBusTopicPublicationOperation(topic = "people-topic")
write operation publishPerson(Person):Person
}
@HttpOperation(path = "/api/q/publishPerson", method = "POST")
query PublishPersonQuery(@RequestBody request:PersonRequest) {
given { request }
call PersonService::publishPerson
}
The above example works as follows:
- A
POST
request is sent to/api/q/publishPerson
with a body containing a name - The query calls
publishPerson
, which constructs aPerson
from the request - The message is published to the Service Bus topic
people-topic
Advanced features
Message properties and metadata
Service Bus provides several methods to add metadata to messages. Orbital supports accessing and setting these properties through annotations:
import com.orbitalhq.azure.servicebus.ServiceBusMessageProperty
model Person {
name : Name inherits String
@ServiceBusMessageProperty(name = "MessageId")
messageId : String
@ServiceBusMessageProperty(name = "CorrelationId")
correlationId : String
}
When consuming messages, these properties will be populated from the incoming message. When publishing messages, these properties will be set on the outgoing message.
Error handling and dead-letter queues
Service Bus includes built-in dead-letter queue functionality. When errors occur during message processing, you can configure Orbital to handle these scenarios:
@ServiceBusService(connectionName = "warehouseConnection")
service PersonService {
@ServiceBusTopicSubscriptionOperation(
topic = "people-topic",
subscription = "all-people",
maxDeliveryCount = 3 // Number of processing attempts before message is dead-lettered
)
stream streamPeople : Stream<Person>
}