Managing data sources

Using AWS services

Before you begin

Before using AWS services, you need to define a connection to AWS in your connections.conf file.

Read more about defining AWS connections here.

Orbital supports reading protobuf messages - either as a message format for HTTP endpoints or from a streaming query originating from Kafka.

You can annotate protobuf sources directly with Taxi metadata, or you can import through the Schema Importer.

DynamoDb

DynamoDb services can only be declared in Taxi.

Querying against key is supported, as is write operations.

To expose a DynamoDb service, the following two annotations are required:

  • Add a @com.orbitalhq.aws.dynamo.Table to a model
  • Add a @com.orbitalhq.aws.dynamo.DynamoService annotation to the service
import com.orbitalhq.aws.dynamo.DynamoService
import com.orbitalhq.aws.dynamo.Table

// connectionName must match the defined connection.
@Table( connectionName = "myAws" , tableName = "reviews" )
model Review {
  @Id
  movieId : MovieId
  score: ReviewScore inherits Int
}

@DynamoService
service DynamoService {
  // Exposes query operations that return Review[]
  table reviews: Review[]

  // allows upsert calls
  @UpsertOperation
  write operation saveReview(Review):Review
}

Write operations (such as upserts) can only be invoked in mutations.

Lambda

Orbital can invoke a Lambda, either to discover data in a query, or in a mutation.

A service must have the AwsLambdaService annotation, passing the configured connection.

Operations have the LambdaOperation annotation.

import com.orbitalhq.aws.lambda.AwsLambdaService
import com.orbitalhq.aws.lambda.LambdaOperation

@AwsLambdaService( connectionName = "myAws" )
service MovieDb {
  @LambdaOperation(name = "streamingprovider")
  operation movieQuery(@RequestBody StreamingProviderRequest): StreamingProviderResponse
}

S3

Orbital can connect to S3 and read a file containing CSV data.

We’re excited to expand the functionality of our S3 adapter. If you have a use case to discuss, please get in touch on slack.

import com.orbitalhq.formats.Csv
@Csv
type Orders {
   symbol : Symbol by column(2)
   open : Price by column(3)
   high : Price by column(4)
   close : Price by column(6)
}
@S3Service( connectionName = "myAws" )
service AwsBucketService {
   @S3Operation(bucket = "Orders")
   vyneQl query fetchReports(body:VyneQlQuery): OrderWindowSummary[]
}

SQS

Consuming events

Orbital can subscribe to a stream of data from SQS.

import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {
  @SqsOperation( queue = "movies" )
  operation streamMovieQuery():Stream<Movie>
}

This can then be queried using a standard stream query:

stream { Movie }
// as ... 

Publishing events

Orbital can publish to a queue using a mutation:

import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {
  @SqsOperation( queue = "movies" )
  write operation publishMoveEvent(Movie):Movie
}

Publishing events can only be invoked in mutations.

Example: Consuming from one SQS topic, and publishing to another

import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation

@SqsService( connectionName = "moviesConnection" )
service MovieService {

  @SqsOperation(queue = "newReleases" )
  operation newReleases():Stream<Movie>

  @SqsOperation( queue = "moviesToReview" )
  write operation publishMoveEvent(Movie):Movie
}

// Query: consume from the new releases queue, and publish to
// a "movies to review" queue
stream { Movie }
call MovieService::publishMovieEvent
Previous
Hazelcast
Next
Caching