Data Transports

Introduction

(update: this feature is being phased out and replaced by Data Imports)

Data transports allows you to transport your data from your production databases to your analytics/reporting database, without having to write much code. It copies database tables of your choice from a source database to a destination database.

Currently we support the following transport sources:

  • Amazon Redshift
  • Google BigQuery
  • MySQL
  • MongoDB
  • Microsoft SQL Server
  • Postgres

And the following destination databases:

  • Amazon Redshift
  • Google BigQuery
  • Postgres
  • MySQL
  • SQL Server

Setup

This feature requires Holistics CLI, make sure you have set up it.

Then log in to Holistics with the below

Logging In.

Log in with the below command. You can get the token in Holistics (go to Settings, under API Key section)

$ holistics login <token>
Authenticating token...
Authentication successful. Info:
- ID: 1
- Email: [email protected]

Listing Data Sources

Run following command to list all data sources, remember its ID so that you can specify it in the configuration file

$ holistics ds_list
Listing all data sources...
| ID | Type       | Name          |
|----+------------+---------------|
| 1  | postgresql | Production DB |
| 2  | redshift   | Analytics DB  |

Transporting Data

For help on the syntax, type:

$ holistics help transport

Basic Use-case

For example, to copy table public.users from Postgres to Redshift, simply run:

$ holistics transport submit --from-ds-id=1-mysql-production --dest-ds-id=2-redshift-data-warehouse -t public.users

The above command automatically deduces the columns and types of the destination Redshift table based on the original Postgres table.

It, however, doesn't allow you to do any special adjustments, or specifying Redshift's sort keys, distribution keys or compression types.

For such things, please refer to the advanced example below.

Advanced Use-cases

The following example transports data from MySQL to Amazon Redshift, extracting page views data from table events into destination table public.pageviews .

Table events' schema structure:

mysql> describe events;
+-----------+------------+------+-----+---------+-------+
| Field     | Type       | Null | Key | Default | Extra |
+-----------+------------+------+-----+---------+-------+
| timestamp | bigint(20) | NO   |     | NULL    |       |
| body      | json       | NO   |     | NULL    |       |
+-----------+------------+------+-----+---------+-------+
2 rows in set (0.00 sec)

Command:

$ holistics transport submit -c pageviews.json

File pageviews.json

{
  "from_ds_id": "50-mysql",
  "from_table_name": "events",
  "dest_ds_id": "48-redshift",
  "dest_table_name": "public.pageviews",
  "columns": [
    {
      "column_name": "timestamp",
      "data_type": "bigint"
    },
    {
      "column_name": "page",
      "data_type": "varchar(15)",
      "source_expression": "json_extract(body, '$.page')"
    },
    {
      "column_name": "user_id",
      "data_type": "int",
      "source_expression": "json_extract(body, '$.user_id')"
    }
  ],
  "where_clause": "json_extract(body, '$.event') = 'pageview'"
}

The above example will translate into the following SQL query to be executed against the original database (MySQL):

SELECT
  timestamp AS `timestamp`,
  json_extract(body, '$.page') AS `page`,
  json_extract(body, '$.user_id') AS `user_id`
FROM events
WHERE (json_extract(body, '$.event') = 'pv')

Once the result is returned, it'll be loaded into the destination database table with structure you've specified above.

Incremental Transport

By default, the whole table is copied and replaced into the destination database. This simple approach works great with small data size, but as your tables grow, it makes more sense to only transport the difference (the recently updated) data.

For incremental transport to work, an incremental column has to be specified and present on both source and destination tables. We'll base on the max value of this column to decide the needed records to move over.

Specify 2 configs in the JSON template:

{
  ...

  "default_mode": "incremental",
  "increment_column": "id",

  ...
}

And run off the transport command normally. On first run, if the destination table is not there, it'll perform a full transport. On subsequent run, incremental transport will kick in.

You can also overwrite the transport mode using the command-line:

$ holistics transport submit -c table.json --full
$ holistics transport submit -c table.json --incremental

Incremental Transport with UI

Here is the screenshot of our upsert feature:

Incremental column: only loads data from source that have the corresponding values of this column bigger than all records in the destination source. Sample use case: If you set the updated_at as the incremental column, then only data from source updated later than all records in the destination will be loaded.

Primary key column: Help us delete any out-dated records fast and accurately. Note that this column is not required, . Rendering invalid column (not containing primary keys) may result in data crash.

Adding Indexes to Destination Table

Note: Right now this feature only supports for Postgres destination.

After moving the table to destination, if you want to add indexes to the table, you can specify the indexes in the indexes section of the JSON:

{
  ...

  "columns": [
    {
      "column_name": "date_d",
      "data_type": "date"
    },
    {
      "column_name": "page",
      "data_type": "varchar(15)"
    },
    {
      "column_name": "user_id",
      "data_type": "int"
    }
  ],
  "indexes": [
    { "columns": ["date_d"] },
    { "columns": ["page", "user_id"] }
  ]
}

The above JSON will create 2 indexes, one on date_d field, and one composite index on (page, user_id).

Loading Data from MongoDB Source

Holistics Transport support loading data from MongoDB (NoSQL) into your SQL reporting database. Due to the NoSQL nature of MongoDB, here is a few things to take note.

  • For nested object, you can use MongoDB dot notation to navigate to the wanted value.
  • When specifying where_clause, you need to specify a JSON object similar to find() method of MongoDB.

See an example of a transport file based on the MongoDB sample restaurants dataset:

{
  "from_ds_id": "31-mongo-source",
  "from_table_name": "restaurants",
  "dest_ds_id": "11-pg-reporting-db",
  "dest_table_name": "public.restaurants",
  "columns": [
    {
      "column_name": "restaurant_id",
      "data_type": "int"
    },
    {
      "column_name": "name",
      "data_type": "varchar"
    },
    {
      "column_name": "borough",
      "data_type": "varchar"
    },
    {
      "column_name": "zipcode",
      "data_type": "varchar",
      "source_expression": "address.zipcode"
    }
  ],
  "default_mode": "incremental",
  "increment_column": "restaurant_id",
  "where_clause": {
    "borough": { "$ne": "Brooklyn" }
  }
}

Note: if you're using incremental transport on MongoDB, and your increment column is of type datetime, please add the following to your transport JSON: "increment_column_type": "datetime"

Config File’s Parameters

Parameters

from_ds_id, dest_ds_id

(required) ID/slugs of the source and destination datasources.

from_table_name, dest_table_name

(required) specifying fully-qualified names of the source table and destination tables

columns

(required) An array of hashes that represent the columns schema of destination table

where_clause

(optional) Additional where condition in case you want to perform more filters from original table.

default_mode

(optional) Takes in either full or incremental. Default to full

increment_column

(required when mode is incremental) Specify column that is used for incremental transport. This column must be present in both source and destination tables.

copy_options (string)

Extra parameters to be included in the COPY command used to load data into destination table. Note that this option only works for destination data source Amazon Redshift.

COPY <table_name> FROM <s3_path>
...
<copy_options>

Column's parameters

column_name

(required) name of destination table's column

data_type

(optional) string value, columns's data type, default to varchar(255) if not specified

source_expression

(optional) string value, contains SQL expression to be executed to retrieve value for the current column, default to column_name (i.e. extracting the column's value if not specified

Redshift-only parameters

Distribution Key: Read more about Redshift' distribution key.

  • dist_style: string value, specifying distribution style. Values: ALL, EVEN, KEY
  • dist_key: string value, if dist_style is KEY, specifying distribution column

Sort Keys: Read more about Redshift's sort keys.

  • sort_style: string value, denoting sort style. Values: single, compound, interleaved
  • sort_keys: string or array of string value, denoting the column(s) as sort keys

Compression Types: For each column, you can optionally specify the compression type of the column. Read more about Redshift's compression type.

{
  "columns": [
    {
      "column_name": "ts",
      "data_type": "bigint",
      "compression_type": "delta"
    },
    {
      "column_name": "page",
      "data_type": "varchar(15)",
      "source_expression": "json_extract(body, '$.page')",
      "compression_type": "lzo"
    }
  ]
}