Problem and constraints

PostHog provides Apps for data imports, exports and transformation purposes.

App metrics helps users of apps want to know whether the apps are reliable and have tooling to debug errors.

When designing the schema, we needed ingestion of these stats to be as 'cheap' as possible. On the flip side, queries against the data did not need to support much beyond time-range filtering.

Schema

SQL
CREATE TABLE sharded_app_metrics
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
plugin_config_id Int64,
category LowCardinality(String),
job_id String,
successes SimpleAggregateFunction(sum, Int64),
successes_on_retry SimpleAggregateFunction(sum, Int64),
failures SimpleAggregateFunction(sum, Int64),
error_uuid UUID,
error_type String,
error_details String CODEC(ZSTD(3)),
_timestamp DateTime,
_offset UInt64,
_partition UInt64
)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/posthog.sharded_app_metrics', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (team_id, plugin_config_id, job_id, category, toStartOfHour(timestamp), error_type, error_uuid)
Click here to see supporting tables
SQL
CREATE TABLE app_metrics
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
plugin_config_id Int64,
category LowCardinality(String),
job_id String,
successes SimpleAggregateFunction(sum, Int64),
successes_on_retry SimpleAggregateFunction(sum, Int64),
failures SimpleAggregateFunction(sum, Int64),
error_uuid UUID,
error_type String,
error_details String CODEC(ZSTD(3)),
_timestamp DateTime,
_offset UInt64,
_partition UInt64
)
ENGINE=Distributed('posthog', 'posthog', 'sharded_app_metrics', rand())
CREATE MATERIALIZED VIEW app_metrics_mv
TO posthog.sharded_app_metrics
AS SELECT
team_id,
timestamp,
plugin_config_id,
category,
job_id,
successes,
successes_on_retry,
failures,
error_uuid,
error_type,
error_details
FROM posthog.kafka_app_metrics
CREATE TABLE kafka_app_metrics
(
team_id Int64,
timestamp DateTime64(6, 'UTC'),
plugin_config_id Int64,
category LowCardinality(String),
job_id String,
successes Int64,
successes_on_retry Int64,
failures Int64,
error_uuid UUID,
error_type String,
error_details String CODEC(ZSTD(3))
)
ENGINE=Kafka('kafka:9092', 'clickhouse_app_metrics_test', 'group1', 'JSONEachRow')

Decision: Store errors in the same table as metrics

Error tracking is fundamentally different from metrics, but we wanted to avoid "failure counts" and errors we have data on going out of sync.

For this reason, the two are stored in the same table, with error_details column containing JSON-encoded metadata about the error including the relevant event payload, stack trace.

This runs the risk of data storage increasing significantly if a lot of large errors occur.

For this reason error_details column uses ZSTD(3) codec.

Sorting by error_type also has a significance: error_details of a given error type should be similar and compress well.

In the future, we might introduce TTLs for the error columns if storage becomes a problem or periodically wipe error data in other ways.

Decision: pre-aggregate metrics in app in memory

Apps act on events as they're processed and users might have dozens of apps installed at the same time.

For this reason, emitting a Kafka message per app per event ingested ends up being too expensive. We instead aggregate metrics (and errors) in memory and only periodically flush data to Kafka.

This runs the trade-off of counts being subtly off after deploys or restarts. If this becomes a significant user concern, we may reduce the precision of the numbers shown in the UI.

Decision: using AggregatingMergeTree

To make ingesting and storing this data cheaper, AggregatingMergeTree is used.

Each time two parts are merged, rows with identical ORDER BY values are collapsed into a single row in the new part.

In this setup, this means that:

  • we aggregate data up to an hours granularity (since toStartOfHour(timestamp) is in the ORDER BY)
  • successes, successes_on_retry and failures columns get summed up for each unique value of ORDER BY
  • errors are not aggregated at all (since error_uuid is in the ORDER BY)

Even with all of this we still need to sum values in queries as merges may never occur.

Decision: sharding

To make data cheaper to store, this table is sharded.

Results

On US Cloud, the disk size of this table was 6 MB after aggregating nearly 2 billion metrics. For comparison, storage similar number of events can require hundreds of gigabytes.

Queries against this schema are also usually measured in milliseconds.

The reason we were able to leverage pre-aggregation to this extent was since we only needed to answer a few questions:

  • What is the number of successes and failures in a given time range per day or hour, diced by plugin, method and job?
  • How many errors of each type did we see in that time range?
  • What are some examples of specific errors we saw?

These queries all lend itself well to pre-aggregation, meaning an expert schema could store this data very cheaply at the cost of some flexibility.

Next schema in the ClickHouse manual: person_distinct_id

Questions?

Was this page useful?

Next article

person_distinct_id

person_distinct_id table makes for an interesting case study on how initial schema design flaws were exposed over time and how they were fixed. Problem being solved PostHog needs to know who are the users associated with each event. In frontend libraries like posthog-js, when persons land on a site they're initially anonymous with a random distinct ID. As persons log in or sign up, posthog.identify should be called to signal that the anonymous person is actually some logged in person and…

Read next article