Managing data sources
Using AWS services
Before you begin
Before using AWS services, you need to define a connection to AWS in your connections.conf
file.
Read more about defining AWS connections here.
Orbital supports reading protobuf messages - either as a message format for HTTP endpoints or from a streaming query originating from Kafka.
You can annotate protobuf sources directly with Taxi metadata, or you can import through the Schema Importer.
DynamoDb
DynamoDb services can only be declared in Taxi.
Querying against key is supported, as is write operations.
To expose a DynamoDb service, the following two annotations are required:
- Add a
@com.orbitalhq.aws.dynamo.Table
to a model - Add a
@com.orbitalhq.aws.dynamo.DynamoService
annotation to the service
import com.orbitalhq.aws.dynamo.DynamoService
import com.orbitalhq.aws.dynamo.Table
// connectionName must match the defined connection.
@Table( connectionName = "myAws" , tableName = "reviews" )
model Review {
@Id
movieId : MovieId
score: ReviewScore inherits Int
}
@DynamoService
service DynamoService {
// Exposes query operations that return Review[]
table reviews: Review[]
// allows upsert calls
@UpsertOperation
write operation saveReview(Review):Review
}
Write operations (such as upserts) can only be invoked in mutations.
Lambda
Orbital can invoke a Lambda, either to discover data in a query, or in a mutation.
A service must have the AwsLambdaService
annotation, passing the configured connection.
Operations have the LambdaOperation
annotation.
import com.orbitalhq.aws.lambda.AwsLambdaService
import com.orbitalhq.aws.lambda.LambdaOperation
@AwsLambdaService( connectionName = "myAws" )
service MovieDb {
@LambdaOperation(name = "streamingprovider")
operation movieQuery(@RequestBody StreamingProviderRequest): StreamingProviderResponse
}
S3
Orbital can connect to S3 and read a file containing CSV data.
We’re excited to expand the functionality of our S3 adapter. If you have a use case to discuss, please get in touch on slack.
import com.orbitalhq.formats.Csv
@Csv
type Orders {
symbol : Symbol by column(2)
open : Price by column(3)
high : Price by column(4)
close : Price by column(6)
}
@S3Service( connectionName = "myAws" )
service AwsBucketService {
@S3Operation(bucket = "Orders")
vyneQl query fetchReports(body:VyneQlQuery): OrderWindowSummary[]
}
SQS
Consuming events
Orbital can subscribe to a stream of data from SQS.
import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation( queue = "movies" )
operation streamMovieQuery():Stream<Movie>
}
This can then be queried using a standard stream
query:
stream { Movie }
// as ...
Publishing events
Orbital can publish to a queue using a mutation:
import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation( queue = "movies" )
write operation publishMoveEvent(Movie):Movie
}
Publishing events can only be invoked in mutations.
Example: Consuming from one SQS topic, and publishing to another
import com.orbitalhq.aws.sqs.SqsService
import com.orbitalhq.aws.sqs.SqsOperation
@SqsService( connectionName = "moviesConnection" )
service MovieService {
@SqsOperation(queue = "newReleases" )
operation newReleases():Stream<Movie>
@SqsOperation( queue = "moviesToReview" )
write operation publishMoveEvent(Movie):Movie
}
// Query: consume from the new releases queue, and publish to
// a "movies to review" queue
stream { Movie }
call MovieService::publishMovieEvent