Managing data sources

Using AWS services

Before you begin

Before using AWS services, you need to define a connection to AWS in your connections.conf file.

Read more about defining AWS connections here.

Orbital supports reading protobuf messages - either as a message format for HTTP endpoints or from a streaming query originating from Kafka.

You can annotate protobuf sources directly with Taxi metadata, or you can import through the Schema Importer.

DynamoDb

DynamoDb services can only be declared in Taxi.

Querying against key is supported, as is write operations.

To expose a DynamoDb service, the following two annotations are required:

  • Add a @com.orbitalhq.aws.dynamo.Table to a model
  • Add a @com.orbitalhq.aws.dynamo.DynamoService annotation to the service
import com.orbitalhq.aws.dynamo.DynamoService
import com.orbitalhq.aws.dynamo.Table

// connectionName must match the defined connection.
@Table( connectionName = "myAws" , tableName = "reviews" )
model Review {
  @Id
  movieId : MovieId
  score: ReviewScore inherits Int
}

@DynamoService
service DynamoService {
  // Exposes query operations that return Review[]
  table reviews: Review[]

  // allows upsert calls
  @UpsertOperation
  write operation saveReview(Review):Review
}

Write operations (such as upserts) can only be invoked in mutations.

Lambda

Orbital can invoke a Lambda, either to discover data in a query, or in a mutation.

A service must have the AwsLambdaService annotation, passing the configured connection.

Operations have the LambdaOperation annotation.

import com.orbitalhq.aws.lambda.AwsLambdaService
import com.orbitalhq.aws.lambda.LambdaOperation

@AwsLambdaService( connectionName = "myAws" )
service MovieDb {
  @LambdaOperation(name = "streamingprovider")
  operation movieQuery(@RequestBody StreamingProviderRequest): StreamingProviderResponse
}

S3

Orbital can use S3 as a source for both reading and writing data

Reading from S3

To declare a data source that exposes data, take the following steps:

  • Declare a service annotated with @S3Service.
  • Within that service, declare an operation with @S3Operation, providing the bucket name
  • The operation should take a single parameter, of type FilenamePattern, indicating the filename (or pattern) to read from

Remember the imports

Be sure to either add the imports for S3Service, S3Operation and FilenamePattern (as shown below), or use their fully qualified names.

Here’s an example:

import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern

@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
    @S3Operation(bucket = "MyBucket")
    operation readBucket(filename:FilenamePattern = "films.csv"):Film[]
}

Filename patterns when reading from S3

When reading from S3 files, you can either specify a single filename (e.g., films.csv), or a pattern - such as film*.csv, *.csv, or even just * to read everything in the bucket.

When working with a pattern, Orbital will read and combine all matching files, treating them as a single response.

For information about supported file formats, see file formats.

Examples - Reading from S3

Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:

tradeSummary.taxi
import com.orbitalhq.formats.Csv
import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern


// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
model TradeSummary {
   symbol : Symbol inherits String
   open : OpenPrice inherits Decimal
   high : HighPrice inherits Decimal
   close : ClosePrice inherits Decimal
}

Reading a single file

s3services.taxi
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
    @S3Operation(bucket = "MyTrades")
    operation readBucket(filename:FilenamePattern = "trades.csv"):TradeSummary[]
}

With the above schema, we can issue a simple query to return the contents of trade.csv:

query.taxi
find { TradeSummary[] }

Reading the contents of multiple files

s3services.taxi
@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
    @S3Operation(bucket = "MyTrades")
    // This will read all files ending in csv present in the bucket
    operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]
}

With the above schema, we can issue a simple query to return the contents of all *.csv files in the bucket:

query.taxi
find { TradeSummary[] }

Reading files and exposing as an HTTP endpoint

Using the same service definition as shown above, we can expose the contents of our *.csv files with a query published as an HTTP endpoint:

import taxi.http.HttpOperation

@HttpOperation(url = "/api/q/trades", method = "GET")
find { TradeSummary[] }

For more information, read our guide on publishing queries as endpoints.

Reading files and publishing to Kafka

This example shows how to read a CSV file from S3, and publish each row as an individual message to Kafka, as a JSON object

First, we’ll declare our Kafka broker and associated message format:

kafka.taxi
import com.orbitalhq.kafka.KafkaService
import com.orbitalhq.kafka.KafkaOperation

// The message format we're publishing to Kafka.
// Because there's no format defined, it's JSON by default
model TradeSummaryEvent {
   ticker : Symbol
   // field names and structure are different, but the
   // types are the same as on our source model.
   prices: {
      openPrice : OpenPrice
      highPrice : HighPrice
      closePrice : ClosePrice
   }
}

// Declare our Kafka service and operation
@KafkaService( connectionName = "market-prices" )
service MyKafkaService {

   // Define an operation that writes to Kafka
   @KafkaOperation( topic = "tradeRecords" )
   write operation publishTrades(TradeSummaryEvent):TradeSummaryEvent 
} 

With the above in place, we can write a query that reads from S3, transforms from CSV to our JSON format, and writes it out to Kafka.

query.taxi
find { TradeSummary[] }
call MyKafkaService::publishTrades

In the above example, Orbital detects that the inbound model (TradeSummary) is different from the destination format (TradeSummaryEvent) and handles the transformation for us.

In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.

Finally, we might want to expose an HTTP POST operation to trigger this update:

query.taxi
import taxi.http.HttpOperation

@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call MyKafkaService::publishTrades

For more information about working with Kafka, including defining connections to brokers, see our dedicated docs on Kafka

Reading files and saving to a Database

This example shows how to read a CSV file from S3, save each row as a record to a database.

First, we’ll define our database table, and associated service

trades-db.taxi
import com.orbitalhq.jdbc.Table
import com.orbitalhq.jdbc.DatabaseService
import com.orbitalhq.jdbc.InsertOperation

@Table(connection = "trades-database", schema = "public" , table = "trades" )
model TradeSummaryRecord {
   symbol : Symbol
   open : OpenPrice
   high : HighPrice
   close : ClosePrice
   timestamp : Instant = now()
}


@DatabaseService(connection = "trades-database")
service TradesDatabase {
   @InsertOperation
   write operation saveTradeSummary(TradeSummaryRecord):TradeSummaryRecord
}

With the above in place, we can write a query that reads from S3, transforms from CSV to our database format, and performs the database inserts:

query.taxi
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary

In the above example, Orbital detects that the inbound model (TradeSummary) is different from the destination format (TradeSummaryRecord) and handles the transformation for us.

In our example, that’s simple converting from CSV to JSON and restructuring the message. However, the transformation could be richer, doing tasks such as calling services to discover data.

Finally, we might want to expose an HTTP POST operation to trigger this update:

query.taxi
import taxi.http.HttpOperation

@HttpOperation(url = "/api/q/publishTradeUpdates", method = "POST")
find { TradeSummary[] }
call TradesDatabase::saveTradeSummary

For more information about working with databases, including defining connections to databases, and the support for different types of databases, see our dedicated docs on Databases

Writing to S3

To declare an operation that can write data to S3, take the following steps:

  • Declare a service annotated with @S3Service.
  • Within that service, declare a write operation with @S3Operation, providing the bucket name
  • The operation should take a two parameters:
    • One with a @RequestPayload annotation, which contains the contents to be written
    • One of type FilenamePattern which defines the filename to write to

Remember the imports

Be sure to either add the imports for S3Service, S3Operation, RequestBody and FilenamePattern (as shown below), or use their fully qualified names.

Here’s an example:

import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern
import com.orbitalhq.aws.s3.RequestBody

@S3Service(connectionName = "MyAwsConnection")
service AwsBucketService {
    @S3Operation(bucket = "MyBucket")
    write operation writeToS3(@RequestBody films:Film[], filename:FilenamePattern = "films.csv"):Film[]
}

Filename patterns when writing to S3

When writing to S3 filenames, filename patterns are not supported (unlike when reading).

If you declare a filename with a pattern, an error will be thrown.

Examples - Writing to S3

Each of the below examples will work with the following model of content stored on S3, so we’re defining it once here for brevity:

tradeSummary.taxi
import com.orbitalhq.formats.Csv

// Define the format we're reading.
// This is a CSV file, so the @Csv annotation is used.
// (Note it's also imported at the top of the file)
@Csv
model TradeSummary {
   symbol : Symbol inherits String
   open : OpenPrice inherits Decimal
   high : HighPrice inherits Decimal
   close : ClosePrice inherits Decimal
}

Fetching from an API and writing the results to S3 as a CSV

This example shows data fetched from a REST API (exposed as JSON), and stored onto S3.

As part of the operation, we’ll transform a tree-like JSON structure into a flattened CSV file.

First, we’ll define the API and it’s response object:

rest-service.taxi
model StockPriceUpdate {
   ticker : Symbol
   // field names and structure are different, but the
   // types are the same as on our source model.
   prices: {
      openPrice : OpenPrice
      highPrice : HighPrice
      closePrice : ClosePrice
   }
}

service ApiService {
   @HttpOperation(url="http://myApi.com/prices", method = "GET")
   operation getPrices():StockPriceUpdate[]
}

And, we’ll define a write operation on S3 to store the content:

s3.taxi
import com.orbitalhq.aws.s3.S3Service
import com.orbitalhq.aws.s3.S3Operation
import com.orbitalhq.aws.s3.FilenamePattern

@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
    @S3Operation(bucket = "trades")
    write operation writeTradeSummary(@RequestBody payload: TradeSummary[], filename: FilenamePattern = "trades.csv"):StockPriceCsv[]
}

Given the above, we can use the following query to read from our API, transform the data, and write to our s3 bucket:

query.taxi
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary

This will result in the data returned from our API call to be converted to CSV and written to trades.csv on our S3 bucket.

If we’d like to set the filename within our query, we could:

query.taxi
given { filename : FilenamePattern = 'todaysTrades.csv' }
find { StockPriceUpdate[] }
call AwsBucketService::writeTradeSummary

This time, the output is written to todaysTrades.csv

S3 File formats

In the above examples, our content has been stored in S3 using CSV.

This is defined because the model used in our operations is annotated with @Csv, as shown in the following excerpt:

model.csv
import com.orbitalhq.formats.Csv

@Csv
model TradeSummary {
  // ... omitted
}

@S3Service( connectionName = "myAwsConnection" )
service AwsBucketService {
    // reading CSV
    @S3Operation(bucket = "MyTrades")
    // This operation returns a collection of
    // TradeSummary objects, which are defined with @Csv
    operation readBucket(filename:FilenamePattern = "*.csv"):TradeSummary[]

    // writing CSV
    @S3Operation(bucket = "trades")
    write operation writeTradeSummary(
      // The requesy body is a collection
      // of trade summaries, which are configured as CSV
      @RequestBody payload: TradeSummary[], 
      filename: FilenamePattern = "trades.csv"
    ):StockPriceCsv[]
}

The format can be any supported format, such as Avro, XML, CSV (or any other character-delimited file), or even Protobuf.

If no format is defined, JSON is used as the default.

For more information, see Data formats

SQS

Consuming events

Orbital can subscribe to a stream of data from SQS.

import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {
  @SqsOperation( queue = "movies" )
  operation streamMovieQuery():Stream<Movie>
}

You can also control the visibility duration time and poll time in your corresponding aws connection configuration:

aws {
    my-aws-account {
        connectionName=my-aws-account
        accessKey=${?AWS_ACCESS_KEY_ID}
        secretKey=${?AWS_SECRET_ACCESS_KEY}
        region=${AWS_REGION}
        connectionParameters {
          // SQS Receive  duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request. Defaul value of 300 will be used if skipped.
          sqsReceiveRequestWaitTime=500
          //SQS Receive Message Request Wait Time In Seconds, default value of 10 will be used if not specified.
          sqsReceiveRequestWaitTime=10
        }
    }
}

This can then be queried using a standard stream query:

stream { Movie }
// as ... 

Publishing events

Orbital can publish to a queue using a mutation:

import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {
  @SqsOperation( queue = "movies" )
  write operation publishMovieEvent(Movie):Movie
}

Publishing events can only be invoked in mutations.

Example: Consuming from one SQS topic, and publishing to another

import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {

  @SqsOperation(queue = "newReleases" )
  operation newReleases():Stream<Movie>

  @SqsOperation( queue = "moviesToReview" )
  write operation publishMoveEvent(Movie):Movie
}

// Query: consume from the new releases queue, and publish to
// a "movies to review" queue
stream { Movie }
call MovieService::publishMovieEvent
Previous
Hazelcast
Next
Writing queries