Like Jitsu? Give us a star on ā­ GitHub!

šŸ“œ Configuration

Configuration UI

šŸ‘©ā€šŸ”¬ Extending Jitsu

Overview
Destination Extensions
Source Extensions
API Specs

Jitsu Internals

Source Extensions (Plugins)

  1. Overview
  2. Quickstart
  3. Project Structure
  4. Source Extension Interface
  5. Source Config
  6. Catalog of Streams
  7. Pulling data
  8. Descriptor
  9. Testing
  10. Adding to Jitsu
  11. Advanced

Overview

Jitsu Source Plugins allow anyone to implement a new source type for Jitsu and publish it to make it available for all users of Jitsu.

Source Plugins designed to pull data from third party services. Each source may have multiple types of data stream, e.g.: users and events.

Quickstart

We need to use Jitsu SDK's CLI tool to bootstrap a project for new source plugin:

npx jitsu-cli extension create --type source

nodejs and npx must be installed on your system.

jitsu-cli creates a project for a source plugin. All parts are working, but they are placeholder implementation and don't do anything meaningful ā€“ sample source project simply returns one data row with run_number and configuration parameters.

If you are an experienced developer, you can start replacing placeholder logic with your own right away.

Project Structure

jitsu-cli generates project directory structure with a set of files typical for Typescript node.js project:

ā”œā”€ā”€ package.json
ā”œā”€ā”€ tsconfig.json
ā”œā”€ā”€ src
ā”‚   ā””ā”€ā”€ index.ts
ā””ā”€ā”€ __test__
  • package.json ā€“ file contains meta-information about npm project including name, version
  • src/index.ts ā€“ file contains the main logic along with plugin descriptor, config objects, source stream catalog
  • __test__/ ā€“ directory for tests
  • tsconfig.json ā€“ settings for Typescript compiler. No need to change that

Source Extension Interface

Jitsu Source Extension must export following symbols:

  • descriptor - instance of ExtensionDescriptor, to describe the extension - name, icon, config params
  • validator - instance ofConfigValidator, to run validation again
  • sourceCatalog - instance of SourceCatalog, to describe the data
  • streamReader - instance of StreamReader, to

For more on these types please check files with full type descriptions, or check out @jitsu/types npm package

To make concrete implementation of these types we need to define our types for Source config and Stream config.

Source Config

Source Config must contain properties required to connect the third party service, e.g.: userId, password, or apiKey or for oauth2.0: clientId, clientSecret, accessToken,... also it may contain configuration parameters of data streams that are common for all source's stream type, e.g.: start_date,refresh_window

Starting from here we will use Airtable source implementation for examples:

export interface AirtableConfig {
  //API Key. Read on how to get API key here: https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-
  apiKey: string;
  //Base ID. Read how to get Base ID: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
  baseId: string;
}

If you're using Jitsu Configurator, those parameters will be displayed properly in the UI

Validating Config

Once we have the source config, we implement the validator. The validator is an optional part, but highly recommend

In the validator we need to check whether provided config grants access to the API.

async function validator(config: AirtableConfig): Promise<ConfigValidationResult> {
  console.log(`Will connect to airtable with apiKey=${config.apiKey} and baseId=${config.baseId}`);
  let res = await fetch("https://api.airtable.com/v0/meta/bases/" + config.baseId + "/tables", {
    method: "get",
    headers: [["Authorization", "Bearer " + config.apiKey]],
  });
  if (res.headers?.get("Content-Type").indexOf("application/json") >= 0) {
    let json = await res.json();
    if (json.error) {
      return { ok: false, message: `${json.error?.type || json.error} ${json.error?.message || ""}`.trim() };
    } else {
      return true
    }
  } else {
    return { ok: false, message: `Error Code: ${res.status} msg: ${await res.text()}` };
  }
}

Now we can test our validator with validate-config action that jitsu-cli already added to the project

yarn build && yarn validate-config -c '{"apiKey":"keyMy", "baseId":"app1234"}'

alternatively config may be stored in JSON-file:

yarn build && yarn validate-config -c config.json

If everything is fine, we should get the following output:

[info ] - šŸ¤” Validating configuration {"apiKey":"keyMy", "baseId":"app1234"}
Will connect to airtable with apiKey=keyMy and baseId=app1234
[info ] - āœ… Config is valid. Hooray!
[info ] - āœØ Done

Catalog of Streams

The source extension need a way to tell Jitsu what types of Streams it supports, and those Streams should be configured.

Think of streams as tables in database. In fact, each stream will be represented as a table in destination database.

First we need to declare type from StreamConfig:

export interface TableStreamConfig {
  //Table Id. Read how to get table id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
  tableId: string;
  //View Id (Optional). Read how to get view id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs
  viewId?: string;
  //Fields. Comma separated list of field names. If empty or undefined - all fields will be downloaded
  fields?: string;
}

And then declare sourceCatalog function that returns an array of StreamPrototype type:

const sourceCatalog: SourceCatalog<AirtableConfig, TableStreamConfig> = async (config: AirtableConfig) => {
  return [
    {
      type: "table",
      supportedModes: ["full_sync"],
      params: [
        {
          id: "tableId",
          displayName: "Table Id",
          documentation:
                  "Read how to get table id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
          required: true,
        },
        {
          id: "viewId",
          displayName: "View Id",
          documentation:
                  "Read how to get view id: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
          required: false,
        },
        {
          id: "fields",
          displayName: "Fields",
          documentation: "Comma separated list of field names. If empty or undefined - all fields will be downloaded",
          required: false,
        },
      ],
    },
  ];
};

In this example, the source exports on stream type (called table), which is configurable with tableId, viewId and fields parameter. User can configure a multiple instances of one stream type. In case of Airtable, each stream will represent a view of a table. Optionally, with a subset of fields (see fields parameter)

Pulling data

The main logic happens in streamReader function. This function pulls data from third party API and send it to Jitsu Server as messages through StreamSink interface (see detailed description of all message types below)

const streamReader: StreamReader<AirtableConfig, TableStreamConfig> = async (
        sourceConfig: AirtableConfig,
        streamType: string,
        streamConfiguration: StreamConfiguration<TableStreamConfig>,
        streamSink: StreamSink,
        services: { state: StateService }
) => {
  const airtable = new Airtable({ apiKey: sourceConfig.apiKey });

  let table = airtable.base(sourceConfig.baseId).table(streamConfiguration.parameters.tableId);
  const selectedFields = streamConfiguration.parameters.fields
          ? streamConfiguration.parameters.fields.split(",").map(f => f.trim())
          : [];
  let selectParams: QueryParams<any> = {};
  if (selectedFields.length > 0) {
    streamSink.log("INFO", "Fields filter: " + JSON.stringify(selectedFields));
    selectParams.fields = selectedFields;
  }
  if (streamConfiguration.parameters.viewId) {
    selectParams.view = streamConfiguration.parameters.viewId;
  }
  let allRecords = await table.select(selectParams).all();
  allRecords.forEach(r => {
    const { id, createdTime, fields } = r._rawJson;
    //add record message
    streamSink.addRecord({
      $id: id,
      $recordTimestamp: new Date(createdTime),
      __sql_type_created: "timestamp with time zone",
      ...fields,
    });
  });
};

Descriptor

descriptor object is used by Jitsu to build Source UI form with proper descriptions and documentations:

const descriptor: ExtensionDescriptor = {
  id: "airtable",
  displayName: "Airtable Source",
  icon: "<svg xmlns=\"http://www.w3.org/2000/svg\" viewBox=\"0 0 64 64\" width=\"100%\" height=\"100%\"><path d=\"M28.578 5.906L4.717 15.78c-1.327.55-1.313 2.434.022 2.963l23.96 9.502a8.89 8.89 0 0 0 6.555 0l23.96-9.502c1.335-.53 1.35-2.414.022-2.963l-23.86-9.873a8.89 8.89 0 0 0-6.799 0\" fill=\"#fc0\"/><path d=\"M34.103 33.433V57.17a1.6 1.6 0 0 0 2.188 1.486l26.7-10.364A1.6 1.6 0 0 0 64 46.806V23.07a1.6 1.6 0 0 0-2.188-1.486l-26.7 10.364a1.6 1.6 0 0 0-1.009 1.486\" fill=\"#31c2f2\" /> <path d=\"M27.87 34.658l-8.728 4.215-16.727 8.015c-1.06.512-2.414-.26-2.414-1.44V23.17c0-.426.218-.794.512-1.07a1.82 1.82 0 0 1 .405-.304c.4-.24.97-.304 1.455-.112l25.365 10.05c1.3.512 1.4 2.318.133 2.925\" fill=\"#ed3049\" /><path d=\"M27.87 34.658l-7.924 3.826L.512 22.098a1.82 1.82 0 0 1 .405-.304c.4-.24.97-.304 1.455-.112l25.365 10.05c1.3.512 1.4 2.318.133 2.925\" fill=\"#c62842\" /></svg>",
  description: "This source pulls data from Airtable base",
  configurationParameters: [
    {
      id: "apiKey",
      displayName: "API Key",
      documentation:
              "Read on how to get API key here: https://support.airtable.com/hc/en-us/articles/219046777-How-do-I-get-my-API-key-",
      required: true,
    },
    {
      id: "baseId",
      displayName: "Base Id",
      documentation:
              "Read how to get Base ID: https://support.airtable.com/hc/en-us/articles/4405741487383-Understanding-Airtable-IDs",
      required: true,
    },
  ],
};

Testing

We need Jitsu Server to run the Source extension, but jitsu-cli created our project with the execute action that allows executing the source in command line mode with provided configs. Pass Source Config json to with -c parameter and StreamConfig with -s If source supports multiple types of streams it is required to pass stream type with stream parameter of StreamConfig object

yarn execute -c '{apiKey: "keyMy", baseId:"app1234"}' -s '{stream: "table", tableId:"tblMy"}'

Result will look like that:

[info ] - šŸƒ Getting available streams...
[info ] - šŸŒŠ Stream: table. Parameters: tableId - Table Id, viewId - View Id, fields - Fields
[info ] - [record] {"$id":"rec55g5x7GyRkz0Ex","$recordTimestamp":"2020-06-11T01:30:22.000Z","Client":["recNvBXPsvaiu5QaH"],"Due date":"2020-11-01","Project lead":{"id":"usrSdmrY5yGdbcZzg","email":"katherineduh+collab23@demo.htable.com","name":"Leslie Walker"},"Category":"Technology design","Name":"Lemon headband","Project team":[{"id":"usr6hWARwVNgmt3WW","email":"katherineduh+collab35@demo.htable.com","name":"Sam Epps"},{"id":"usru7j5m2lcNhriKv","email":"katherineduh+collab7@demo.htable.com","name":"Cameron Toth"},{"id":"usrSdmrY5yGdbcZzg","email":"katherineduh+collab23@demo.htable.com","name":"Leslie Walker"}],"Tasks":["recLLYKORkbdzQV1g","recUyxiAcW4x56HBH"],"Kickoff date":"2020-10-18"}
[info ] - šŸ Result data:
[
  {
    "$id": "rec55g5x7GyRkz0Ex",
    "$recordTimestamp": "2020-06-11T01:30:22.000Z",
    "Client": "[\"recNvBXPsvaiu5QaH\"]",
    "Due date": "2020-11-01",
    "Project lead": {
      "id": "usrSdmrY5yGdbcZzg",
      "email": "katherineduh+collab23@demo.htable.com",
      "name": "Leslie Walker"
    },
    "Category": "Technology design",
    "Name": "Lemon headband",
    "Project team": "[{\"id\":\"usr6hWARwVNgmt3WW\",\"email\":\"katherineduh+collab35@demo.htable.com\",\"name\":\"Sam Epps\"},{\"id\":\"usru7j5m2lcNhriKv\",\"email\":\"katherineduh+collab7@demo.htable.com\",\"name\":\"Cameron Toth\"},{\"id\":\"usrSdmrY5yGdbcZzg\",\"email\":\"katherineduh+collab23@demo.htable.com\",\"name\":\"Leslie Walker\"}]",
    "Tasks": "[\"recLLYKORkbdzQV1g\",\"recUyxiAcW4x56HBH\"]",
    "Kickoff date": "2020-10-18"
  }
]
Special column types:

[info ] - āœØ Done

First jitsu-cli prints all messages received from source. Then it prints array of all received objects.

Adding to Jitsu

To add our plugin to jitsu we need to build and publish it.

To build plugin code use:

yarn build

Publishing to NPM Repository

Publishing plugin to public npm repository to make it available for other users. You need to have an account in https://www.npmjs.com

The following commands in the project directory will publish the package to the npmjs repository:

npm login
npm publish

npm will ask to provide some additional details to complete the publishing.

Setting up Jitsu Server

Users of a standalone jitsu server can setup a source based on plugin since version 1.42. Add a new source of type sdk_source, information about plugin package and version, and config to eventnative.yaml file:

sources:
...
  myairtable:
    type: sdk_source
    destinations:
      - target_destination
    collections:
      - name: table
        type: table
        table_name: myairtable_table
        schedule: '@daily'
        parameters:
          tableId: tblMy
    schedule: '@daily'
    config:
      apiKey: keyMy
      baseId: app1234
      package_name: jitsu-airtable-source
      package_version: ^0.7.2

Setting up Jitsu Joint Image or Configurator UI

The Configurator doesn't support an addition of custom sources yet To make your source plugin appear in Jitsu Configurator UI please create a new ticket or pull request in the jitsu repository

More Details

In previous sections we have reviewed process of source creation based on Airtable source as an example. But this source implementation doesn't cover all features and modes that Source Extension can use. Here we give more details on stream sink modes, data type conversions, persistent state management and more

Message types

Source Extension communicates with Jitsu Server by means of StreamSink object and messages of JitsuDataMessage type. Jitsu supports following message types:

TypeStreamSink helper methodModePayloadDescription
recordaddRecordbothDataRecordThe main message that sends a single data object to the stream.
clear_streamclearStreamincrementalnoIn case incremental Source need to clear all previous date. Truncates table associated with stream in target destinations.
When used clear_stream message must be the first message in transaction and can be used only once per stream.
statechangeStatebothany objectStores provided object in persistent storage.
new_transactionnewTransactionincrementalnoStarts a new transaction. Previous transaction will be committed to the destination. All consequent messages will be processed in the new transaction.
First transaction is started implicitly even in the absence of new_transaction message.
delete_recordsdeleteRecordsincrementalDeleteRecordsDeletes records that belongs to certain time period from destination storages.
Time period is controlled with DeleteRecords's partitionTimestamp and granularity parameters.
Requires DataRecord objects to have $recordTimestamp set with value.
delete_records must precede any record messages in transaction
loglogbothLogRecordPrints log message to the Jitsu server task log.
schemaschemabothStreamSchemaDefines stream schema. Not implemented yet

Sync modes

Each stream type can be declared to support full_sync mode, incremental or both. If both modes are supported, 'mode' parameter in StreamConfiguration object must be set to full_sync or incremental. Supported modes are declared in supportedModes property of StreamInstance's returned by sourceCatalog

Full Sync

In Full Sync mode a Source Extension must download all data for the stream on each run.

Full Sync mode is recommended for the most of the Sources. Exceptions are sources that have big amount of data that it takes long time to process or services that charge money from pulling the data. For such case Incremental mode may be a good choice despite being little harder to implement.

Incremental

In Incremental mode a Source Extension download only fresh data on each run. Incremental mode usually requires Source to save some information between runs, e.g. ID or _timestamp of last loaded data record. To save data between run use state message. See Persistent State section for more details.

Refresh window

Some third party services may update data during a day and other service may even update data for longer periods like 30-day. It is still possible to use Incremental mode with such services. But we need to make sure that we delete all previous data for current refresh window before loading it again. For that we need to:

  • Set $recordTimestamp for all record object that we send to Jitsu.
  • Process stream data using date chunks(partitions). Choose granularity - DAY probably will suit for the most cases.
  • Start new_transaction for each new date chunk processed.
  • Add delete_records message at the beginning of each new transaction(including the first one)

You can use Jitsu Google-Analytics source as an example of Refresh Window implementation: https://github.com/jitsucom/jitsu-sdk/blob/main/source-connectors/google-analytics

Persistent State

It may be useful to make some information from previous source run available for consequent runs. For that purpose Source Extension may send any custom object as a payload with state message. Jitsu Service persist that object in meta storage and make it available to the streamReader via StateService We recommend using StateService for modifying state too instead of sending state message directly.

To test that your source work correctly with state you can pass path of the file containing state object to the execute script using -t parameter:

yarn execute -c '{apiKey: "keyMy", baseId:"app1234"}' -s '{stream: "table", tableId:"tblMy"}'  -t ./state.json

Record $id

It is required to set $id property for each record.
Usually Source data has some ID associated with its entities. When this is not the case you can use helper function buildSignatureId from @jitsu/jlib/lib/sources-lib to generate id as hash of object data.

Flattening

Source Extension is allowed to use object with nested objects as DataRecord. If target destination is SQL based warehouse - Jitsu will automatically convert record to flat structure. Otherwise, object structure will be left intact. If you want to force flat structure for all warehouses you can use flatten function from @jitsu/jlib module

Data types mapping

Jitsu assumes types of data record parameters in destination based on original type with exception to dates represented as string in simplified extended ISO format: YYYY-MM-DDTHH:mm:ss.sssZ ā€“ such string will be converted to the Date type in the destination storage

Date timezone

Jitsu runs Sources with default timezone set to UTC