Semantic search at billions scale
How to build Semantic search distributed systems using python, pyspark, faiss and clip! A walk through on building a laion5B semantic search system.
Try it for yourself at https://rom1504.github.io/clip-retrieval/
See semantic search: index anything for an introduction to the semantic search topic. See laion5B blogpost for the full laion5B story.
6 months ago with a team of deep learning enthusiasts we released laion400m : a 400 millions image/text dataset. It has been a pretty big success, used in many papers and experiments to produce contrastive and generative models.
To do the preparation work of this dataset, I built several tools. For 400m items I went with the strategy of using a single node with 16 cores and 32GB of ram and building very efficient tools.
Recently, we continued the effort and produced a 5B samples dataset. Doing a 10x scaling requires new techniques and indeed I decided to improve each of my tools to make them distributed, one by one.
I will explain this story in this post, and I hope you learn something about distributed systems and begin to like them as I do. Distributed systems are really powerful!
This post is organized in 4 parts:
- Img2dataset: distributed downloading of 5B samples into 240TB of images
- Clip inference: distributed inference of 240TB of images into 18TB of embeddings
- Autofaiss : distributed indexing of 9TB of embeddings into 800GB of index
- Clip back : serving a semantic search engine using a single 16GB of ram machine
The main idea behind distributed computing is that processing a large amount of data is difficult, however processing many times a small amount of data is much easier!
So splitting the data in small pieces is a crucial part of building a distributed system.
Img2dataset
Img2dataset is a tool that is able to download and resize images from urls very fast.
- Input: url and text files (800GB)
- Output: tar files containing images and text (240TB)
How fast ? About 90 sample/s/core.
Using 16 cores, that means downloading 400m samples takes 3 days, that’s pretty good!
However, what if I have 5B samples? It would take 50 days, that’s way too long!
Using 160 cores would bring back this number to 5 days which is much more reasonable.
Machines with 160 cores are rare and expensive, so let’s instead use 10 machines with 16 cores.
The trick is then only about distributing the computation. The architecture is composed of :
- Splitting the input into shards, save those to disk with reader.py
- Using a generic distributor implementation to support both pyspark and multiprocessing
- A downloader that takes a shard of url and metadata, downloads it and produces an image shard
- A writer to save each output shard with various formats
The important tools I used:
- Pyspark to distribute the task in 10 nodes
- Wandb for metric tracking
- Fsspec for saving to/from any file system
- Webdataset/pyarrow/pandas to handle reading and writing
Read this distributed img2dataset guide to learn how to run this yourself.
I used wandb for tracking the downloads: laion2B download laion2B multi download laion1B nolang download
Tips and tricks:
- Computing exif for billions of items eventually reach some weird edge cases, but you can just fix it
- Building pex automatically on release using github actions makes it easy to distribute the code to many machines
- There are so many ways to resize images, thanks ross wightman for helping improve their support
- Supporting tfrecord can be helpful for reading in jax and tensorflow, thanks boris dayman for this
- Using output files with a zero padding numbering format makes it convenient to sort the files and to list them
- Fsspec was really useful to support writing on s3, local or gcs
Clip inference
Clip inference is a tool that takes image and text data and produces image and text embeddings.
- Input: tar files containing images and text (240TB)
- Output: numpy and parquet files (18TB)
One 3080 GPU can process about 1300 sample/s with the smallest clip model. With the largest, it’s only 315, even with an a100.
1300 sample/s means about 3 days for 400m samples but would take a month for 5B.
Let’s again distribute the computation.
Using 32 a100, we can reach a speed of 10000 sample/s which means about a week to compute 5B embeddings.
The general architecture of the code is composed of :
- A generic distributor to handle both pyspark and sequential strategies and uses the pyspark 3 handling of resources to pick the right gpu
- A runner which calls the reader the mapper and the writer inside an executor
- The sampler which decides which files to read from the input
- The reader which uses a sampler to read part of the data as webdataset
- The mapper which takes an image shard and output an embedding shard
- The writer takes an embedding shard and writes it to the file system using fsspec
- The logger which logs metrics in all executors using a folder in the output file system and wandb
- The main which build the config for all of those and start the inference
This guide explains how to run it yourself.
Using 32 gpus, the inference over laion5B took about a week: laion2B en inference laion2B multi inference laion1B nolang inference
Autofaiss
Autofaiss is a tool that takes clip embeddings and produces knn indices.
- Input: numpy files (9TB)
- Output: indices (800GB)
For 400m embeddings it takes between 4h and 12h on a 16 cores machine depending on the product quantization parameter.
For 5B embeddings it would take between 40 and 120 hours, again that’s too much, let’s distribute.
Autofaiss is a wrapper on top of faiss that automatically selects the right kind of index, reads embeddings and builds indices. Read more at the autofaiss blogpost and autofaiss github
To distribute the index building computation we:
- Split the embeddings collection in N parts: for example a 9TB embedding set is transformer into 100 parts of 90GB
- Add each part to a pretrained index, that leaves us with 100 indices parts
- Merge each of these N parts back to M index, for example 10 indices
See distributed.py
This guide explains how to run this yourself.
Doing this process, we built the embedding reader module which makes it possible to read embeddings efficiently from any file system.
The building of the laion5B index took 16h on 10 nodes with each 16 cores with a PQ of 128.
Tips and tricks:
- The distributed on disk faiss guide is really good to explain the various options for distributed indexing
- Using fsspec here again allowed to support all filesystems, eg s3 or hdfs
Clip back
Clip back allows to plug the index and metadata and search among results.
- Input: indices and metadata (1.6TB)
- Output: a rest api taking image or test and returning similar url and text
For laion400m the largest index was 100GB and the metadata was 100GB
For laion5B, the index is now 800GB and the metadata 800GB as well.
Unlike previous steps, we’re still in the domain of the reasonable for one node, so I didn’t distribute this part. However, this required interesting adaptations that opened the gate for a sharded backend in the future.
So you may be wondering how to serve 1.6TB of data with a single node. The key is to use memory mapping and nvme SSDs! Using these 2 technologies, it’s possible to use no memory for the index and the metadata and instead use the disk :
- In order to map the result from id to metadata, arrow is good for memmapping see ArrowMetadataProvider and pyarrow ipc
- Merge on disk of faiss allows merging 800GB of index at no ram usage
- Faiss allows to load an index with memory mapping, and hence use a 800GB index at no ram usage
Tips and tricks:
Faiss Search and reconstruct allows to post filter embeddings: clip embeddings contain enough information to
- classify an image into safe/unsafe
- Deduplicate results by computing dot products and keeping only one sample per connected components
Hdd drive speed is about 200MB/s ssd 600MB/s and nvme ssd 3GB/s. The iops and latencies also differ drastically. Random access is very slow on hdd, fast on nvme. That has important consequences on the kind of algorithms to use
- External sort works well on hdd ; but however spark sorting works much better on ssd nvme
- Memory mapping for kv works much better on ssd nvme
- On disk knn are much faster on ssd nvme
Limitations and possible improvements
There are some clear limitations on what was built here:
- All the content is static: real search engines need to update the content regularly
- Only batch processing: combining real time / stream processing and batch processing allows a much higher reactivity while keeping costs low
- Semantic and strict filtering are not combined
I believe it is possible to go even beyond this system by:
- Using tools such as apache beam to combine batch and streaming processing
- Using a constantly updated source such as a custom crawler or following commoncrawl updates
- Improving faiss by handling one stage filtering (see this excellent post by pinecone on the topic search filtering )
Conclusion
Thanks for reading!
This was a walk through of all the tools I built in order to produce a semantic search system for five billion samples for the laion5B dataset. I hope it inspires you to build large semantic search systems.
I hope you’re now convinced it’s possible to build semantic systems at the billions scale using a reasonable amount of resources. Imagine what else you could index from the world!