Managing data sources
Using AWS services
Before you begin
Before using AWS services, you need to define a connection to AWS in your connections.conf
file.
Read more about defining AWS connections here.
Orbital supports reading protobuf messages - either as a message format for HTTP endpoints or from a streaming query originating from Kafka.
You can annotate protobuf sources directly with Taxi metadata, or you can import through the Schema Importer.
DynamoDb
DynamoDb services can only be declared in Taxi.
Querying against key is supported, as is write operations.
To expose a DynamoDb service, the following two annotations are required:
- Add a
@com.orbitalhq.aws.dynamo.Table
to a model - Add a
@com.orbitalhq.aws.dynamo.DynamoService
annotation to the service
import com.orbitalhq.aws.dynamo.DynamoService
import com.orbitalhq.aws.dynamo.Table
// connectionName must match the defined connection.
@Table( connectionName = "myAws" , tableName = "reviews" )
model Review {
@Id
movieId : MovieId
score: ReviewScore inherits Int
}
@DynamoService
service DynamoService {
// Exposes query operations that return Review[]
table reviews: Review[]
// allows upsert calls
@UpsertOperation
write operation saveReview(Review):Review
}
Write operations (such as upserts) can only be invoked in mutations.
Lambda
Orbital can invoke a Lambda, either to discover data in a query, or in a mutation.
A service must have the AwsLambdaService
annotation, passing the configured connection.
Operations have the LambdaOperation
annotation.
import com.orbitalhq.aws.lambda.AwsLambdaService
import com.orbitalhq.aws.lambda.LambdaOperation
@AwsLambdaService( connectionName = "myAws" )
service MovieDb {
@LambdaOperation(name = "streamingprovider")
operation movieQuery(@RequestBody StreamingProviderRequest): StreamingProviderResponse
}
S3
Orbital can use S3 as a source for both reading and writing data
Reading from S3
To declare a data source that exposes data, take the following steps:
- Declare a service annotated with
@S3Service
. - Within that service, declare an operation with
@S3Operation
, providing the bucket name - The operation should take a single parameter, of type
FilenamePattern
, indicating the filename (or pattern) to read from
Remember the imports
S3Service
, S3Operation
and FilenamePattern
(as shown below), or use their fully qualified names.Here’s an example:
import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyBucket")
operation readBucket(filename:FilenamePattern = "films.csv"):Film[]
}
Filename patterns when reading from S3
When reading from S3 files, you can either specify a single filename (e.g., films.csv
),
or a pattern - such as film*.csv
, *.csv
, or even just *
to read everything in the bucket.
When working with a pattern, Orbital will read and combine all matching files, treating them as a single response.
For information about supported file formats, see file formats.
Examples - Reading from S3
Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:
import com.orbitalhq.formats.Csv
import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern
// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
model TradeSummary {
symbol : Symbol inherits String
open : OpenPrice inherits Decimal
high : HighPrice inherits Decimal
close : ClosePrice inherits Decimal
}
Reading a single file
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyTrades")
operation readBucket(filename:FilenamePattern = "trades.csv"):TradeSummary[]
}
With the above schema, we can issue a simple query to return the contents of trade.csv
:
find { TradeSummary[] }
Reading the contents of multiple files
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyTrades")
// This will read all files ending in csv present in the bucket
operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]
}
With the above schema, we can issue a simple query to return the contents of all *.csv
files in the bucket:
find { TradeSummary[] }
Reading files and exposing as an HTTP endpoint
Using the same service definition as shown above, we can expose the contents of our *.csv
files
with a query published as an HTTP endpoint:
import taxi.http.HttpOperation
@HttpOperation(url = "/api/q/trades", method = "GET")
find { TradeSummary[] }
For more information, read our guide on publishing queries as endpoints.
Reading files and publishing to Kafka
This example shows how to read a CSV file from S3, and publish each row as an individual message to Kafka, as a JSON object
First, we’ll declare our Kafka broker and associated message format:
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation
// The message format we're publishing to Kafka.
// Because there's no format defined, it's JSON by default
model TradeSummaryEvent {
ticker : Symbol
// field names and structure are different, but the
// types are the same as on our source model.
prices: {
openPrice : OpenPrice
highPrice : HighPrice
closePrice : ClosePrice
}
}
// Declare our Kafka service and operation
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {
// Define an operation that writes to Kafka
@KafkaOperation( topic = "tradeRecords" )
write operation publishTrades(TradeSummaryEvent):TradeSummaryEvent
}
With the above in place, we can write a query that reads from S3, transforms from CSV to our JSON format, and writes it out to Kafka.
find { TradeSummary[] }
call MyKafkaService::publishTrades
In the above example, Orbital detects that the inbound model (TradeSummary
) is
different from the destination format (TradeSummaryEvent
) and handles the transformation
for us.
In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.
Finally, we might want to expose an HTTP POST operation to trigger this update:
import taxi.http.HttpOperation
@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call MyKafkaService::publishTrades
For more information about working with Kafka, including defining connections to brokers, see our dedicated docs on Kafka
Reading files and saving to a Database
This example shows how to read a CSV file from S3, save each row as a record to a database.
First, we’ll define our database table, and associated service
import com.orbitalhq.jdbc.Table
import com.orbitalhq.jdbc.DatabaseService
import com.orbitalhq.jdbc.InsertOperation
@Table(connection = "trades-database", schema = "public" , table = "trades" )
model TradeSummaryRecord {
symbol : Symbol
open : OpenPrice
high : HighPrice
close : ClosePrice
timestamp : Instant = now()
}
@DatabaseService(connection = "trades-database")
service TradesDatabase {
@InsertOperation
write operation saveTradeSummary(TradeSummaryRecord):TradeSummaryRecord
}
With the above in place, we can write a query that reads from S3, transforms from CSV to our database format, and performs the database inserts:
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary
In the above example, Orbital detects that the inbound model (TradeSummary
) is
different from the destination format (TradeSummaryRecord
) and handles the transformation
for us.
In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.
Finally, we might want to expose an HTTP POST operation to trigger this update:
import taxi.http.HttpOperation
@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary
For more information about working with databases, including defining connections to databases, and the support for different types of databases, see our dedicated docs on Databases
Writing to S3
To declare an operation that can write data to S3, take the following steps:
- Declare a service annotated with
@S3Service
. - Within that service, declare a
write
operation with@S3Operation
, providing the bucket name - The operation should take a two parameters:
- One with a
@RequestPayload
annotation, which contains the contents to be written - One of type
FilenamePattern
which defines the filename to write to
- One with a
Remember the imports
S3Service
, S3Operation
, RequestBody
and FilenamePattern
(as shown below), or use their fully qualified names.Here’s an example:
import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern
import com.orbitalhq.aws.s3.RequestBody
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
@S3Operation(bucket = "MyBucket")
write operation writeToS3(@RequestBody films:Film[], filename:FilenamePattern = "films.csv"):Film[]
}
Filename patterns when writing to S3
When writing to S3 filenames, filename patterns are not supported (unlike when reading).
If you declare a filename with a pattern, an error will be thrown.
Examples - Writing to S3
Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:
import com.orbitalhq.formats.Csv
// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
model TradeSummary {
symbol : Symbol inherits String
open : OpenPrice inherits Decimal
high : HighPrice inherits Decimal
close : ClosePrice inherits Decimal
}
Fetching from an API and writing the results to S3 as a CSV
This example shows data fetched from a REST API (exposed as JSON), and stored onto S3.
As part of the operation, we’ll transform a tree-like JSON structure into a flattened CSV file.
First, we’ll define the API and it’s response object:
model StockPriceUpdate {
ticker : Symbol
// field names and structure are different, but the
// types are the same as on our source model.
prices: {
openPrice : OpenPrice
highPrice : HighPrice
closePrice : ClosePrice
}
}
service ApiService {
@HttpOperation(url="http://myApi.com/prices", method = "GET")
operation getPrices():StockPriceUpdate[]
}
And, we’ll define a write operation on S3 to store the content:
import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern
@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
@S3Operation(bucket = "trades")
write operation writeTradeSummary(@RequestBody payload: TradeSummary[], filename: FilenamePattern = "trades.csv"):StockPriceCsv[]
}
Given the above, we can use the following query to read from our API, transform the data, and write to our s3 bucket:
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary
This will result in the data returned from our API call to be converted
to CSV and written to trades.csv
on our S3 bucket.
If we’d like to set the filename within our query, we could:
given { filename : FilenamePattern = 'todaysTrades.csv' }
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary
This time, the output is written to todaysTrades.csv
S3 File formats
In the above examples, our content has been stored in S3 using CSV.
This is defined because the model used in our operations is annotated with @Csv
,
as shown in the following excerpt:
import com.orbitalhq.formats.Csv
@Csv
model TradeSummary {
// ... omitted
}
@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
// reading CSV
@S3Operation(bucket = "MyTrades")
// This operation returns a collection of
// TradeSummary objects, which are defined with @Csv
operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]
// writing CSV
@S3Operation(bucket = "trades")
write operation writeTradeSummary(
// The requesy body is a collection
// of trade summaries, which are configured as CSV
@RequestBody payload: TradeSummary[],
filename: FilenamePattern = "trades.csv"
):StockPriceCsv[]
}
The format can be any supported format, such as Avro, XML, CSV (or any other character-delimited file), or even Protobuf.
If no format is defined, JSON is used as the default.
For more information, see Data formats
SQS
Consuming events
Orbital can subscribe to a stream of data from SQS.
import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation( queue = "movies" )
operation streamMovieQuery():Stream<Movie>
}
You can also control the visibility duration time
and poll time
in your corresponding aws
connection configuration:
aws {
my-aws-account {
connectionName=my-aws-account
accessKey=${?AWS_ACCESS_KEY_ID}
secretKey=${?AWS_SECRET_ACCESS_KEY}
region=${AWS_REGION}
connectionParameters {
// SQS Receive duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. Defaul value of 300 will be used if skipped.
sqsReceiveRequestWaitTime=500
//SQS Receive Message Request Wait Time In Seconds, default value of 10 will be used if not specified.
sqsReceiveRequestWaitTime=10
}
}
}
This can then be queried using a standard stream
query:
stream { Movie }
// as ...
Publishing events
Orbital can publish to a queue using a mutation:
import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation( queue = "movies" )
write operation publishMovieEvent(Movie):Movie
}
Publishing events can only be invoked in mutations.
Example: Consuming from one SQS topic, and publishing to another
import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation(queue = "newReleases" )
operation newReleases():Stream<Movie>
@SqsOperation( queue = "moviesToReview" )
write operation publishMoveEvent(Movie):Movie
}
// Query: consume from the new releases queue, and publish to
// a "movies to review" queue
stream { Movie }
call MovieService::publishMovieEvent