An Efficient Online ML Feature Store with Fast Rollbacks

Isaac Joseph
Affirm Tech Blog
Published in
10 min readMay 28, 2021

--

Overview

Say you have a user browsing your online store for products and you want to recommend products based on this user’s browsing history. What are some ways you could gather this information for generating recommendations?

An example feature could be the aggregate count of the number of times the user has viewed a product. Assuming all pertinent data is readily available at recommendation-time online, we could compute as-needed by scanning each user’s viewing history.

Calculating this feature repeatedly is costly and inefficient, so we can optimize by regularly precomputing and storing view history objects for each product/ user. This article will focus on how we addressed the following questions.

  • Where would this feature be stored?
  • How is it updated?
  • What is the latency for querying this feature during model prediction time?

The Machine Learning Platform Team at Affirm built an Online Feature Store (OFS) using a partitioned Aurora MySQL Database to serve these types of features.

Our architecture is analogous to that used by offline features in the Uber Michelangelo Palette feature store, except with extra features related to financial domain-specific accuracy demands: strong consistency and quick, automated rollbacks to previous batches.

Requirements

We have ~20 million users and want to store view information about the 10 most popular items for each user.

We want this information to be updated with an SLA of once per day; we want to be able to quickly rollback to previous days’ states to address data pipeline corruption, reaching back one week to allow for more subtle-issues to be detected by anomaly-detection pipelines.

This gives about 20 million × 7 days = 140 million rows in the database total. Assuming each row is about 200 bytes, this yields 4 GB per day (hot rows) and 28 GB total (24 GB of cold rows).

We want our request latency to this service to be low enough such that it does not impact our existing (parallelized) online critical path process, so < 100 ms.

Our expected read throughput is 50 RPS, bursting to 500 RPS at peak times due to promotional events or holidays, meaning 1 KB/s with 100KB/s bursts.

We want to be able to load a days’ worth of data in a reasonable amount of time, say 1 hour, meaning we want about 1 MB/ s batch load performance.

We are willing to sacrifice availability for consistency: we’d rather give a null response than an incorrect one as the downstream models can handle this gracefully via imputation. Being in financial services, we have high correctness requirements due to ethics, compliance, and user trust.

Finally, we expect more use cases to emerge, so want to be able to scale to handle more feature types without overly time-consuming structural overhauls.

This will be implemented as a backend service that communicates with our other decisioning service(s) in a service-oriented architecture environment.

Why Aurora MySQL (vs. DynamoDB/ NoSQL)?

To improve latency, we’d like our data schema to be denormalized (having objects containing all consumer-required data attributes). In the above example, such an object would show users’ viewing history counts for the top 10 globally most-viewed items.This object could be arbitrarily complicated if computed offline, which could help downstream model accuracy; for example, beyond raw counts of views per item, we might want to perform some kind of aggregation against other users’, storing the percentile (over the distribution of all users) of the number of times a user has viewed a category.

These objects will be keyed by a simple identifier (user ID in the above example).

As this is a semi-structured key-value store without the need for complex joins, which may be large and needed to be scaled, a NoSQL (e.g. DynamoDB) solution seems appropriate.

Why not DynamoDB / NoSQL?

Affirm has used DynamoDB in the past for similar use cases, however, and found it to not suffice due to (1) poor handling of hot keys and (2) additional complexity without observability.

We expect hot keys (rows much more commonly accessed than others), because (1) some users will request recommendations substantially more often than others and (2) we want to only query a days’ worth of data yet keep around historical data for easy (< 1 minute SLA) data rollbacks. DynamoDB’s partitioning scheme historically forced pricing based on the highest-throughput partition, meaning if we partitioned by user, our costs would be artificially inflated due to non-uniformity. Others have used BigTable which reportedly handles this better, but migrating to GCP for a single service was not worth it complexity-wise.

For DynamoDB, uniform distribution across partitions ideal cost-wise and performance-wise, but difficult to achieve in practice

Furthermore, the “serverless” nature of DynamoDB combined with the relatively opaque API available makes it very difficult to diagnose performance issues when they arise.

To be fair, DynamoDB claims to have recently addressed the hot key pricing/ throughput/ observability issues, but there are other reasons to use Aurora MySQL.

Why Aurora MySQL?

MySQL, on the other hand, is well-understood at Affirm. Amazon’s Aurora backend rewrite of the MySQL engine is the sweet spot — just enough added complexity to yield substantial performance gains, yet not too much to be difficult or bespoke knowledge-heavy to maintain

I/O cost for Aurora MySQL is per overall use, not per max partition, and done post-hoc on use rather than on a priori provisioning. Aurora’s decoupling of storage into a cluster volume makes it easy to scale out the cluster for future higher read throughput by adding read replicas, as each replica need only store a cache rather than a full record of the DB’s contents.

MySQL partitioning works well for our hot (current days’; accessed) and cold (previous weeks’ rows; ready on standby for data rollbacks) scheme under Aurora. We simply LIST partition on an ID corresponding to a days’ worth of rows, and due to partition pruning, read queries will automatically only look up a cluster index for rows’ data.

Furthermore, with Aurora, on-read-replica caching is handled automatically (in the buffer cache), lazily-loaded from the backing cluster volume. This means that the most recent days’ batch of rows will naturally end up in the cache, evicting the previous days’ information as needed. Hot key-users are now actually an asset because this means our read replica memory does not have to be as big.

From Amazon Aurora: Design Considerations for High Throughput Cloud-Native Relational Databases; shows separation of cluster volume (‘logging + storage’) from the database instances themselves, helpful for scaling read throughput.

As our SLA is updating once per day, we can load rows from S3 in batch using the specific LOAD FROM S3 command; this is very performant as it amounts to directly loading rows into the backing cluster volume from S3 to the (private AWS) object storage.

Finally, the backing cluster volume is automatically scaled out transparently by AWS and not particularly costly, meaning that we don’t have to worry too much about adding too many rows or not removing stale rows, as it will not be costly nor will it affect performance.

Putting this all together in practice for our use case, we see a high buffer cache hit ratio (green), meaning most queries are served directly from the read replica in-memory. Looking at the cluster volume IOPs (cache misses, orange), we see they are generally negligible except due to two events: (1) a batch reading of the whole database into a separate (Snowflake) datastore for OLAP, which happens daily at 20:00, and (2) a partition-dropping scheduled job which runs weekly to remove rows and partitions that are older than 14 days (which reads historical rows to verify that they can be deleted). Finally, we see high write operations (red) at 09:00, which is when that days’ data is loaded into the database in batch.

Cloudwatch OFS cache metrics

Service endpoint latency remains very low (~22 ms p98, ~ 50 ms max per DB query) despite ~50 million rows (30–40 GB) in the service’s database.

Endpoint latency distribution (including service application logic and Aurora queries)

We only needed a single read replica (in addition to the writer, both relatively-small db.r3.xlarge) to achieve this.

IDOL: Custom Protobuf

Protobuf is a good candidate to store our denormalized data, due to the following properties:

  1. It is language-agnostic; Affirm uses Kotlin in addition to Python.
  2. It has performant ser/des; we found that using Protobuf instead of the library we used previously resulted in a 40% decrease in p99 deserialization latency in some cases.
  3. It works well with our custom RPC system, which already uses a Protobuf wire format. This means our objects can be directly injected into the body of an RPC request without additional ser/des, minimizing CPU overhead.

However, Protobuf has a few shortcomings:

The first is that it automatically imputes missing field values to a default value based on the field type [1] [2]; thus, it’s impossible to tell whether a field was missing or actually set to a default value. This is particularly troublesome in an ML context, since we want to be able to distinguish actually missing data in order to apply appropriate imputation logic and correctly compute statistics. Numeric types, for example, have a default value of 0, which would cause a lot of trouble if we were to add a new field representing a count or a dollar amount, say.

Another is that Protobuf doesn’t define a versioning scheme for its schemas, but we’d like static guarantees about which schema changes are safe to deploy in both an online SOA environment and an offline analytics environment.

IDOL is a layer on top of Protobuf that addresses these shortcomings: it provides (1) a mechanism distinguishing not-set from False-like values and (2) schema versioning and associated logic to detect breaking versus non-breaking data model updates. These ease deployment of new data models and the ML models which use them, quickening overall iteration time.

Database Schema

In parallel with our relational / MySQL choice, we opted for a simple layout that has proved useful to fit our fast data-rollback requirements and works well with the partitioning scheme.

Scheme: CompletedBatchRun stores a pointer to the correct batch in the FeatureSet table, which corresponds to a partition of that table. Adding a new feature set means adding another row to the CompletedBatchRun table for the services’ clients to attach and loading the FeatureSet rows in batch.

An upstream service wanting a specific users’ view history object will first make a request to CompletedBatchRun to get the corresponding batch_id , then make a second request to FeatureSet given the user id ( generic_key), feature_set_name and batch_id it just received. Each request is low latency (~10 ns Aurora SelectLatency, ~10 ms including RPC / network overhead) as they are made on on indexed columns. Also, for the second request, due to the partitioning, we only need to search a small index corresponding to one batch, and the on read-replica instance caching/ cluster volume.

We can quickly restore a previous batch_id (for a previous days’ snapshot of user history) by simply changing the corresponding batch_id for the given feature_set_name We have an internal endpoint using AirFlow to do this, allowing for specifying either “previous day” or a specific day within a week of the current.

We load in batch, using AirFlow for scheduling and Luigi for dependency-checking; upstream jobs build each days’ batch by performing aggregations in pyspark.

Performance

We initially reached performance degradation in tests with about 180 GB loaded into our single RDS instance with no partition scheme; however, after adding partitioning, we found that at least this amount of data did not increase latencies. Partitioning by batch_id also allowed us an easy method of garbage-collecting old rows that were not likely to be rolled back to. We also added a read replica.

We deployed the live service and saw no performance concerns in production, but as Affirm’s traffic is spiky, is scaling rapidly, and we want to support more client models in the future, we stress tested the service.

Using distributed Locust, we stress-tested the service as a whole, which runs on Python-environment Kubernetes pods on 3 instances (with the logic implementing the feature store pointer schema mentioned above) and connects to the backing Aurora RDS instance: a single read replica (in addition to the writer, both relatively-small db.r3.xlarges). We loaded 500 million rows at (about 150 GB in the database) and used Locust workers from 6 client services on 3 containers (one per instance) . We reached about 30 RPS until performance degraded on the stage testing environment, which had only r5.largevs. r5.xlarge(so about ½ to 1/10 of the CPU capacity) instances for the application code.

Looking at a representative trace, we see that the DB queries themselves take the minority of the endpoint’s time, suggesting the efficacy of scaling up the application services.

We can easily autoscale by adding application containers/ instances and Aurora read replicas.

Summary

Aurora, combined with careful schema planning, allowed us to satisfy requirements for performance, logical extensibility, and performance scaling for our batch-updated online machine learning feature store.

Acknowledgements

I worked with countless folks to make this a success, including Brett Trimble for devising the schema, Rex Lin and Niloy Gupta for implementation, Adam Johnston for offline feature store connectivity, prototyping and design, Hugues Bruant for database / platform configuration and setup, Neil Vyas for IDOL logic, and Kevin DelRosso for the inaugural use-case vision.

--

--