Advanced

This page describes advanced details of Jitsu Functions such as:

Return types

Function can return one of the following types:

  • undefined (or missing return statement) – event will be passed to the next function in pipeline without changes.
  • "drop", null, [], false – event will be dropped and won't be passed to the next function in pipeline and won't be passed to the destination.
  • object – event will be replaced with the returned object and passed to the next function in pipeline.
  • array of objects – event will be replaced with the returned array and each object will be passed to the destination. This way multiple events can be generated from one event. See Multiplying events section for details.

Function can also throw errors. See Errors handling section for details.

Order of execution

It is possible to attach multiple functions to the same connection. In such case functions will be executed in the order chosen in Connection Editor and each function will receive the result of the previous one as an input unless function returns "drop" or throws a corresponding error.

Multiplying events

You can produce multiple events from one event by returning array of objects from the function.

Caution

One connection can have only one function that multiplies events (returns array with more than one object) and that function must be
the last in the pipeline.

If function that is not the last one in the pipeline returns array with more than one object – processing will be aborted with error and no events will be passed to the destination.

Example: create separate event for each product in purchase:

export default async function(event, { log, fetch }) {
  if (event.event == "purchase" && event.properties?.products?.length > 0) {
    let results = []
    for (const product of event.properties.products) {
      results.push({
        event_type: "purchase",
        product_id: product.id,
        price: product.price
      })
    }
    return results
  } else {
    //skip events without any purchase
    return "drop"
  }
}

SQL column type override

To set specific SQL column type for the field, you can use __sql_type_<field_name> key in the returned object:

export default async function(event, ctx) {
  return {
    ...event,
    event_date: event.timestamp,
    __sql_type_event_date: "date",
    event_time: event.timestamp,
    __sql_type_event_time: "time",
  }
}

It is possible to store any nested object as JSON in the Data Warehouse where JSON format is supported:

export default async function(event, ctx) {
  return {
    ...event,
    user_traits_json: { ...event.context.traits, ...event.traits },
    __sql_type_user_traits_json: "json"
  }
}

Sql types with extra parameters:

Some Data Warehouses support extra parameters for column types during table creation. For such cases, Transform uses the following syntax to provide data type and column type separately:

export default async function(event, ctx) {
  return {
    ...event,
    title: event.context.page.title,
    __sql_type_title: ["varchar(256)", "varchar(256) encode zstd"]
  }
}

Errors handling

If function doesn't throw an error, Jitsu considers it as a successful.

If function throws an error, Jitsu considers it as failed one.

The error will be logged and can be inspected in Data - Live Events section of the Workspace.

Errors retry

By default, failed functions don't interrupt functions pipeline and don't get retried. E.g. if functions A and B are attached to the same connection and A fails, B will still be executed and event will reach the destination

That behavior can be changed by throwing an error of special type RetryError:

throw new RetryError(errorMessage, options);

where as options can be passed an object with following properties:

  • drop – boolean flag that indicates whether to drop event or not (equivalent of drop response). If true, event will be dropped and won't be processed by any other function. If false, event will be passed to the next function in pipeline (event modifications performed by failed function before throwing an error will be lost). By default: false

e.g:

throw new RetryError("Failed to enrich event", { drop: true });
Caution

When multiple user defined functions are attached to the same connection and one of them throws the RetryError – all user defined functions will be retried.

If just one of the function throws RetryError with drop parameter set to true – event will be dropped and won't reach the destination.

Retry attempts and delay

By default, when function throws RetryError, Jitsu will retry it no more than 3 times with the following delays: 10, 100 and 1000 minutes.

You can change default retry attempts and delays by setting the following config block in the function code:

export const config = {
  retryPolicy: {
    attempts: 2, //retry attempts. cannot be greater than 3
    delays:  [60, 1440] // delay in minutes before each consequent attempt. cannot be greater than 1440
  }
}

Example

Let's say we have the following logic parts in our pipeline:

  • filter - filters events by some condition.
  • enrich - enriches event with crucial data using fetch request to external service API. No side effect - can be repeated. But step is critical, events without enrichment has no business value.
  • business logic - increments some business metric using fetch request to external service API. Has side effect - cannot be repeated, because retrying will lead to incorrect metric value.

We need to organize our functions in a such way to ensure that even after retries we will have only one successful run of business logic block. In general, function with side effect should be the last one in the connection:

filter:

export default async function(event, { log, fetch }) {
  if (event.type !== "track") {
    // this destination accepts only track events all other types must be dropped
    return "drop";
  }
}

enrich:

export const config = {
  retryPolicy: {
    attempts: 2, 
    delays:  [60, 1440] // 1st delay - 1 hour, 2nd delay - 1 day
  }
};
 
export default async function(event, { log, fetch }) {
  let error;
  try {
    const url = `https://ip-api.com/json/${event.context.ip}`;
    const result = await fetch(url);
    if (result.ok) {
      const geo = await result.json();
      if (geo.status === "success") {
        // remove status fields
        delete geo.query;
        delete geo.status;
        // add geo information to context
        event.context.geo = geo;
      } else {
        error = `GeoIP status: ${geo.status}: ${geo.message}`;
      }
    } else {
      error = `Failed to fetch GeoIP data ${url}: ${result.status} ${result.statusText}`;
    }
  } catch (e) {
    error = `Failed to fetch response from ${url}: ${e?.message}`;
  }
  if (error) {
    // enrich is critical step, if it fails - we need drop processing and schedule retry
    // `drop` set to true
    throw new RetryError(error, { drop: true });
  }
  return event;
}

business logic:

export const config = {
  retryPolicy: {
    attempts: 2,
    delays:  [60, 1440] // 1st delay - 1 hour, 2nd delay - 1 day
  }
};
 
export default async function(event, { log, fetch }) {
  try {
    const metricUrl = "https://rareanimals.example.com/increment"
    const bisResult = await fetch(url, {
      method: "POST",
      body: {
        country: event.context.geo.country,
        animal: event.properties.animal
      }
    });
    if (!bisResult.ok) {
      error = `Failed to increment metric ${metricUrl}: ${bisResult.status} ${bisResult.statusText}`;
    }
  } catch (e) {
    error = `Failed to increment metric: ${e?.message}`;
  }
  if (error) {
    // business metric is critical step, if it fails - we need schedule retry
    // but it is ok to pass event to the destination since it is already in its final state
    // and jitsu will handle deduplication after retry. So we don't set `drop` flag
    throw new RetryError(error);
  }
  // pass enriched event to the destination
  return event;
}

Built-in functions

Jitsu has certain features implemented as built-in functions:

  • All data warehouse destinations uses the same bridge function that passes event payload to the bulker component.
  • API based destinations (e.g. Mixpanel, Amplitude, Webhook, etc) are implemented as built-in functions.
  • Identity Stitching feature is also implemented as built-in function.

Since they are functions as any other user defined functions, they report logs and errors in the similar way. You can check logs and errors of functions attached to certain connection in the Data - Live Events section of the Workspace.