Like Jitsu? Give us a star on ā­ GitHub!

šŸ“œ Configuration

Configuration UI

šŸ‘©ā€šŸ”¬ Extending Jitsu

Overview
Destination Extensions
Source Extensions
API Specs

Jitsu Internals

Pipelines

As it was mentioned, Jitsu supports two destination modes and data processing depends on them:

Batch processing

  • First, an event is being written in events/incoming directory to the current log file
  • Log files are being rotated once in N (=5 by default) minutes and processed in a separate thread
  • Log files processing. Get all unprocessed logs (all files in events/incoming that not in process)
    • Multiplex records to destinations
    • For each destination: evaluate table_name_template expression to get a destination table name. If the result is an empty string, skip this destination. If evaluation failed, the event is written to events/failed
    • For each destination/table pair:
      • Check status in the status file of the log. If a pair has been processed, ignore it
      • Apply LookupEnrichment step
      • Apply MappingStep (get BatchHeader)
      • Maintain up-to date BatchHeader in memory. If a new field appears add it with type to BatchHeader
      • On type conflict: apply Type Promotion
    • Once batch objects and BatchHeader are prepared, proceed to table patching. For each destination/table pair:
      • Map BatchHeader into Table structure with SQL column types depends on the destination type and primary keys (if configured)
      • Get Table structure from the destination
      • Acquire destination lock (using distributed lock service)
      • Compare two Table structures (from the destination and from data)
      • Maintain primary key (if configured)
      • If a column is missing run ALTER TABLE
      • If a column is present in the table, but missing in BatchHeader - ignore
      • Release destination lock
    • Depend on a destination bulk insert objects to destination with explicit typecast (if it is configured in mapping section) or write them with json/csv serialization to cloud storage and execute destination load command
    • On success update log status file and mark destination/table pair as OK (mark is as FAILED) otherwise. If all pairs are marked as OK, rotate the log file to events/archive

Stream processing

  • Apply multiplexing, put each multiplexed event to the destination queue. Queue items are persisted in events/queue.dst=${destination_id} dir.
  • Separate thread processes each queue. For each event:
    • Run table_name_template expression. If the result is an empty string - skip. If evaluation failed, the event is written to events/failed
    • Apply LookupEnrichment step
    • Apply MappingStep (get BatchHeader)
    • Get Table structure from memory (if memory cache is missing, get the schema from DWH)
    • Do table patching (see above in batch step)
    • Insert object with explicit typecast (if it is configured in the mapping section) using INSERT statement.
    • If INSERT failed, refresh schema from DWH and repeat the step
    • If failed, write the record to events/failed
    • If success, write the event to events/archive