Building ETL pipelines with Orbital
Deprecated
Pipelines are gradually being replaced by streaming queries.
The pipeline syntax is still supported, but we aim to migrate functionality to streaming queries.For workloads that are oriented around streaming data (such as reading from Kafka), use a streaming query instead.
Overview
Where queries in Orbital are a great for on-demand data transformation, Pipelines provide continuously streaming data between source and destination, using Orbital to transform data along the way.
Pipelines work by reading content from a source, transforming using Orbital, then writing to a Sink.
Pipelines can be defined either in a config file, or using the UI.
Starting Pipeline Engine
The pipeline engine ships as a separate docker image.
A typical docker-compose config looks as follows:
version: "3.3"
services:
## Other services omitted
pipeline-engine:
image: orbitalhq/pipeline:latest
Creating a pipeline
Pipelines are created and stored as a series of HOCON files, within a Taxi project.
name: com.demo/pipelines
version: 0.1.0
sourceRoot: src/
additionalSources: {
"@orbital/config" : "orbital/config/*.conf",
"@orbital/pipelines" : "pipelines/*.conf"
}
Pipelines are defined as an Input (transport + spec) and one or more Outputs (transport + spec), with Orbital handling the transformation and enrichment.
// An example pipeline, which watches a local directory for new files.
// For each file, an integration is performed using a transformation,
// and then written to a db
pipelines: [{
"name": "Filewatcher to db",
"id" : "file-watch",
"input": {
"type": "fileWatcher",
"direction": "INPUT",
"typeName" : "Film",
"path" : ${config.pipelines.watchedPath}
},
"transformation" : """
find { Films } as {
title: FilmTitle
director: DirectorName
imdbId: ImdbId
duration: DurationMillis? = first(TrackObject[]) as DurationMillis
}
""",
"outputs": [{
"type": "jdbc",
"direction": "OUTPUT",
"connection": "films",
"tableName": "Films"
}]
}]
Pipelines listen on the input, and as messages arrive they are passed to Orbital to transform to the output models.
The data is transformed, and Orbital’s query engine is used to enrich and populate any missing data.
Although all pipelines are persisted as json files, they can be created either through the UI, or authored as json directly.
Creating a pipeline through configuration
Pipelines are authored and stored in json files, with a file per pipeline.
Files should adhere to the pipeline spec, which has its own dedicated reference documentation.