Querying

Building ETL pipelines with Orbital

Deprecated

Pipelines are gradually being replaced by streaming queries.

The pipeline syntax is still supported, but we aim to migrate functionality to streaming queries.
For workloads that are oriented around streaming data (such as reading from Kafka), use a streaming query instead.

Overview

Where queries in Orbital are a great for on-demand data transformation, Pipelines provide continuously streaming data between source and destination, using Orbital to transform data along the way.

Pipelines work by reading content from a source, transforming using Orbital, then writing to a Sink.

Pipelines can be defined either in a config file, or using the UI.

Starting Pipeline Engine

The pipeline engine ships as a separate docker image.

A typical docker-compose config looks as follows:

version: "3.3"
services:
  ## Other services omitted
  pipeline-engine:
    image: orbitalhq/pipeline:latest

Creating a pipeline

Pipelines are created and stored as a series of HOCON files, within a Taxi project.

taxi.conf
name: com.demo/pipelines
version: 0.1.0
sourceRoot: src/
additionalSources: {
     "@orbital/config" : "orbital/config/*.conf",
     "@orbital/pipelines" : "pipelines/*.conf"
}

Pipelines are defined as an Input (transport + spec) and one or more Outputs (transport + spec), with Orbital handling the transformation and enrichment.

pipeline.conf
// An example pipeline, which watches a local directory for new files.
// For each file, an integration is performed using a transformation,
// and then written to a db
pipelines: [{
      "name": "Filewatcher to db",
      "id" : "file-watch",
      "input": {
        "type": "fileWatcher",
        "direction": "INPUT",
        "typeName" : "Film",
        "path" : ${config.pipelines.watchedPath}
      },
      "transformation" : """
          find { Films } as { 
            title: FilmTitle
            director: DirectorName
            imdbId: ImdbId
            duration: DurationMillis? = first(TrackObject[]) as DurationMillis 
          }
      """,
      "outputs": [{
        "type": "jdbc",
        "direction": "OUTPUT",
        "connection": "films",
        "tableName": "Films"
      }]
}]

Pipelines listen on the input, and as messages arrive they are passed to Orbital to transform to the output models.

The data is transformed, and Orbital’s query engine is used to enrich and populate any missing data.

Although all pipelines are persisted as json files, they can be created either through the UI, or authored as json directly.

Creating a pipeline through configuration

Pipelines are authored and stored in json files, with a file per pipeline.

Files should adhere to the pipeline spec, which has its own dedicated reference documentation.

Previous
Publishing queries (http / websockets)
Next
Query lineage and observability