Find Small Images Inside Large Images
Alaeddine @ Jina AI
October 29, 2021
The purpose of this tutorial is to build an image search engine capable of finding small images inside bigger ones. This requires a different architecture than typical image search engines since we need to perform object detection.
Tip
The full source code of this tutorial is available in this google colab notebook
Understanding And Formulating the Problem
As we want to find small images inside big images, simply encoding both the indexed images and the query image and matching will not work. Imagine that you have the following big image :
It contains a scene with a cat in the background, a bird and a few other items in the scene.
Now let’s suppose that the query image is a simple bird:
Encoding the query image will generate embeddings that effectively represent it. However, it’s not easy to build an encoder that effectively represents the big image, since it contains a complex scene with different objects. The embeddings will not be representative enough and therefore we need to think about a better approach.
Can you think of another solution ?
Hint
Encoding a complex image is not easy, but what if we can encode objects inside it ? Imagine that we can identify these objects inside the big image like so:
Right, identifying objects inside the big image and then encoding each one of them will result in better, more representative embeddings. Right now, we should ask 2 questions:
How can we identify objects ?
How can we retrieve the big image if we match the query against identified objects ?
The first question is easy. And the response is simply object detection. There are many models that can perform object detection and in this tutorial, we will be using yolov5. Detected objects can be easily represented as chunks of the original indexed documents.
See Also
If you’re not familiar with chunks
in jina, check this section
The second question can be a bit complex. Actually, we will match query documents against chunks of the original documents but we need to return the original documents (the big images). We can solve this problem by relying on a ranker executor, which roughly does the following:
Retrieve the parent document IDs from the matched chunks along with their scores
For each parent ID, aggregate the scores of that parent
Replace the matches by the parent documents instead of children documents (aka chunks).
Sort the new matches by their aggregated scores.
Cool, seems like a complex logic, but no worries, we will build our ranker executor later step by step. However, note that since the ranker is not a storage executor, it’s not capable of retrieving the parent documents from chunks. Instead, we can create empty documents that only contain the IDs. This implies that in a later step, we need to retrieve those documents by IDs.
Now let’s try to imagine and design our Flows given what we’ve discussed so far:
Index Flow:
Query Flow:
Oh, because we use the ranker, we will need something to help us retrieve original parent documents by IDs. Well that can be any storage executor. Actually Jina Hub includes many storage executors but in this tutorial, we will build our own storage executor. Since this executor should store parent documents, we will call it the root_indexer
. Also, since we need it in the query Flow, we also have to add it to the index Flow. One more note, this root_indexer
will index documents as they are, so it makes sense to put it in parallel to the other processing steps (segmenting, encoding,…).
Now, the technology behind this executor will be LMDB.
See Also
Jina natively supports complex toplogies of Flow where you can put executors in parallel. Checkout this section to learn more.
Cool, but what about the other indexer ?
Well, it should support matching and indexing chunks of images after they are segmented. Therefore, it needs to support vector search along with indexing. The Jina Hub already includes such indexers (for example, SimpleIndexer
), however, we will create our own version of simple indexer. And by the way, it will be convenient to rename this indexer to chunks_indexer
.
Alright, before seeing the final architecture, let’s agree on names for our executors:
chunks_indexer
:SimpleIndexer
root_indexer
:LMDBStorage
(well because we use LMDB)encoder
:CLIPImageEncoder
(yes we will be using theCLIP
model to encode images)segmenter
:YoloV5Segmenter
. Actually we could nameobject-detector
butsegmenter
is a term that aligns better with Jina’s terminologyranker
:SimpleRanker
(trust me it’s going to be simple)
Finally, here is what our Flows will look like. Index Flow:
Query Flow:
Pre-requisites
In this tutorial, we will need the following dependencies installed:
pip install Pillow jina==2.1.13 torch==1.9.0 torchvision==0.10.0 transformers==4.9.1 yolov5==5.0.7 lmdb==1.2.1 matplotlib [email protected]+https://github.com/jina-ai/jina-commons.git#egg=jina-commons
We also need to download the dataset and unzip it.
You can use the link or the following commands:
wget https://open-images.s3.eu-central-1.amazonaws.com/data.zip
unzip data.zip
You should find 2 folders after unzipping:
images: this folder contains the images that we will index
query: this folder contains small images that we will use as search queries
Building Executors
In this section, we will start developing the necessary executors, for both query and index flows.
CLIPImageEncoder
This encoder encodes an image into embeddings using the CLIP model. We want an executor that loads the CLIP model and encodes it during the query and index flows.
Our executor should:
support both GPU and CPU: That’s why we will provision the
device
parameter and use it when encoding.be able to process documents in batches in order to use our resources effectively: To do so, we will use the parameter
batch_size
be able to encode the full image during the query flow and encode only chunks during the index flow: This can be achieved with
traversal_paths
and methodDocumentArray.batch
.
from typing import Optional, Tuple
import torch
from jina import DocumentArray, Executor, requests
from jina.logging.logger import JinaLogger
from transformers import CLIPFeatureExtractor, CLIPModel
class CLIPImageEncoder(Executor):
"""Encode image into embeddings using the CLIP model."""
def __init__(
self,
pretrained_model_name_or_path: str = "openai/clip-vit-base-patch32",
device: str = "cpu",
batch_size: int = 32,
traversal_paths: Tuple = ("r",),
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.batch_size = batch_size
self.traversal_paths = traversal_paths
self.pretrained_model_name_or_path = pretrained_model_name_or_path
self.device = device
self.preprocessor = CLIPFeatureExtractor.from_pretrained(
pretrained_model_name_or_path
)
self.model = CLIPModel.from_pretrained(self.pretrained_model_name_or_path)
self.model.to(self.device).eval()
@requests
def encode(self, docs: Optional[DocumentArray], parameters: dict, **kwargs):
if docs is None:
return
traversal_paths = parameters.get("traversal_paths", self.traversal_paths)
batch_size = parameters.get("batch_size", self.batch_size)
document_batches_generator = docs.batch(
traversal_paths=traversal_paths,
batch_size=batch_size,
require_attr="blob",
)
with torch.inference_mode():
for batch_docs in document_batches_generator:
blob_batch = [d.blob for d in batch_docs]
tensor = self._generate_input_features(blob_batch)
embeddings = self.model.get_image_features(**tensor)
embeddings = embeddings.cpu().numpy()
for doc, embed in zip(batch_docs, embeddings):
doc.embedding = embed
def _generate_input_features(self, images):
input_tokens = self.preprocessor(
images=images,
return_tensors="pt",
)
input_tokens = {
k: v.to(torch.device(self.device)) for k, v in input_tokens.items()
}
return input_tokens
YoloV5Segmenter
Since we want to retrieve small images in bigger images, the technique that we will heavily rely on is segmenting. Basically, we want to do object detection on the indexed images. This will generate bounding boxes around objects detected inside the images. The detected objects will be extracted and added as chunks to the original documents. By the way, guess what is the state-of-the-art object detection model ?
Right, we will use YoloV5.
Our YoloV5Segmenter should be able to load the ultralytics/yolov5
model from Torch hub, otherwise, load a custom model. To achieve this, the executor accepts parameter model_name_or_path
which will be used when loading. We will implement the method load
which checks if the model exists in the the Torch Hub, otherwise, loads it as a custom model.
For our use case, we will just rely on yolov5s
(small version of yolov5
). Of course, for better quality, you can choose a more complicated model or your custom model.
Furthermore, we want YoloV5Segmenter to support both GPU and CPU and it should be able to process in batches. Again, this is as simple as adding parameters device
and batch_size
and using them during segmenting.
To perform segmenting, we will implement method _segment_docs
which performs the following steps:
For each batch (a batch consists of several images), use the model to get predictions for each image
Each prediction of an image can contain several detections (because yolov5 will extract as much bounding boxes as possible, along with their confidence scores). We will filter out detections whose scores are below the
confidence_threshold
to keep good quality.
Each detection is actually 2 points -top left (x1, y1) and bottom right (x2, y2)- a confidence score and a class. We will not use the class of the detection, but it can be useful in other search applications.
- With the detections that we have, we create crops (using the 2 points returned). Finally, we add these crops to image documents as chunks.
from typing import Dict, Iterable, Optional
import torch
from jina import Document, DocumentArray, Executor, requests
from jina_commons.batching import get_docs_batch_generator
class YoloV5Segmenter(Executor):
def __init__(
self,
model_name_or_path: str = 'yolov5s',
confidence_threshold: float = 0.3,
batch_size: int = 32,
device: str = 'cpu',
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.model_name_or_path = model_name_or_path
self.confidence_threshold = confidence_threshold
self.batch_size = batch_size
if device != 'cpu' and not device.startswith('cuda'):
self.logger.error('Torch device not supported. Must be cpu or cuda!')
raise RuntimeError('Torch device not supported. Must be cpu or cuda!')
if device == 'cuda' and not torch.cuda.is_available():
self.logger.warning(
'You tried to use GPU but torch did not detect your'
'GPU correctly. Defaulting to CPU. Check your CUDA installation!'
)
device = 'cpu'
self.device = torch.device(device)
self.model = self._load(self.model_name_or_path)
@requests
def segment(
self, docs: Optional[DocumentArray] = None, parameters: Dict = {}, **kwargs
):
if docs:
document_batches_generator = get_docs_batch_generator(
docs,
traversal_path=['r'],
batch_size=parameters.get('batch_size', self.batch_size),
needs_attr='blob',
)
self._segment_docs(document_batches_generator, parameters=parameters)
def _segment_docs(self, document_batches_generator: Iterable, parameters: Dict):
with torch.no_grad():
for document_batch in document_batches_generator:
images = [d.blob for d in document_batch]
predictions = self.model(
images,
size=640,
augment=False,
).pred
for doc, prediction in zip(document_batch, predictions):
for det in prediction:
x1, y1, x2, y2, conf, cls = det
if conf < parameters.get(
'confidence_threshold', self.confidence_threshold
):
continue
crop = doc.blob[int(y1) : int(y2), int(x1) : int(x2), :]
doc.chunks.append(Document(blob=crop))
def _load(self, model_name_or_path):
if model_name_or_path in torch.hub.list('ultralytics/yolov5'):
return torch.hub.load(
'ultralytics/yolov5', model_name_or_path, device=self.device
)
else:
return torch.hub.load(
'ultralytics/yolov5', 'custom', model_name_or_path, device=self.device
)
Indexers
After developing the encoder, we will need 2 kinds of indexers:
SimpleIndexer: This indexer will take care of storing chunks of images. It also should support vector similarity search which is important to match small query images against segments of original images.
LMDBStorage: LMDB is a simple memory-mapped transactional key-value store. It is convenient for this example because we can use it to store the original images (so that we can retrieve them later). We will use it to create LMDBStorage which offers 2 functionalities: indexing documents and retrieving documents by ID.
SimpleIndexer
To implement SimpleIndexer, we can leverage Jina’s DocumentArrayMemmap
. You can read about this data type here.
Our indexer will create an instance of DocumentArrayMemmap
when it’s initialized. We want to store indexed documents inside the workspace folder that’s why we pass the workspace
attribute of the executor to DocumentArrayMemmap
.
To index, we implement the method index
which is bound to the index flow. It’s as simple as extending the received docs to DocumentArrayMemmap
instance.
On the other hand, for search, we implement the method search
. We bind it to the query flow using the decorator @requests(on='/search')
.
In jina, searching for query documents can be done by adding the results to the matches
attribute of each query document. Since docs is a DocumentArray
we can use method match
to match query against the indexed documents. Read more about match
here. There’s another detail here. We already indexed documents before search, but we need to match query documents against chunks of the indexed images. Luckily, DocumentArray.match
allows us to specify the traversal paths of the right-hand-side parameter with parameter traversal_rdarray
. Since we want to match the left side docs (query) against the chunks of the right side docs (indexed docs), we can specify that traversal_rdarray=['c']
.
from typing import Dict, Optional
from jina import DocumentArray, Executor, requests
from jina.types.arrays.memmap import DocumentArrayMemmap
class SimpleIndexer(Executor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._storage = DocumentArrayMemmap(
self.workspace, key_length=kwargs.get('key_length', 64)
)
@requests(on='/index')
def index(
self,
docs: Optional['DocumentArray'] = None,
**kwargs,
):
if docs:
self._storage.extend(docs)
@requests(on='/search')
def search(
self,
docs: Optional['DocumentArray'] = None,
parameters: Optional[Dict] = None,
**kwargs,
):
if not docs:
return
docs.match(self._storage, traversal_rdarray=['c'])
LMDBStorage
In order to implement the LMDBStorage, we need the following parts:
I. Handler
This will be a context manager that we will use when we access our LMDB database. We will create it as a standalone class.
II. LMDBStorage constructor
The constructor should initialize a few attributes:
the
map_size
of the databasethe
default_traversal_paths
. Actually we need traversal paths because we will not be traversing documents in the same way during index and query flows. During index, we want to store the root documents. However, during query,
we need to get the matches of documents by ID.the index file: again, to keep things clean, we will store the index file inside the workspace folder. Therefore we can use the
workspace
attribute.
III. LMDBStorage.index
In order to index documents, we first start a transaction (so that our Storage executor is ACID-compliant). Then, we traverse them according to the traversal_paths
(will be root in the index Flow). Finally, each document is serialized to string and then added to the database (the key is the document ID)
IV. LMDBStorage.search
Unlike search in the SimpleIndexer, we only wish to get the matched Documents by ID and return them. Actually, the matched documents will be empty and will only contain the IDs. The goal is to return full matched documents using IDs. To accomplish this, again, we start a transaction, traverse the matched documents, get each matched document by ID and use the results to fill our documents.
import os
from typing import Dict, List
import lmdb
from jina import Document, DocumentArray, Executor, requests
class _LMDBHandler:
def __init__(self, file, map_size):
# see https://lmdb.readthedocs.io/en/release/#environment-class for usage
self.file = file
self.map_size = map_size
@property
def env(self):
return self._env
def __enter__(self):
self._env = lmdb.Environment(
self.file,
map_size=self.map_size,
subdir=False,
readonly=False,
metasync=True,
sync=True,
map_async=False,
mode=493,
create=True,
readahead=True,
writemap=False,
meminit=True,
max_readers=126,
max_dbs=0, # means only one db
max_spare_txns=1,
lock=True,
)
return self._env
def __exit__(self, exc_type, exc_val, exc_tb):
if hasattr(self, '_env'):
self._env.close()
class LMDBStorage(Executor):
def __init__(
self,
map_size: int = 1048576000, # in bytes, 1000 MB
default_traversal_paths: List[str] = ['r'],
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.map_size = map_size
self.default_traversal_paths = default_traversal_paths
self.file = os.path.join(self.workspace, 'db.lmdb')
if not os.path.exists(self.workspace):
os.makedirs(self.workspace)
def _handler(self):
return _LMDBHandler(self.file, self.map_size)
@requests(on='/index')
def index(self, docs: DocumentArray, parameters: Dict, **kwargs):
traversal_paths = parameters.get(
'traversal_paths', self.default_traversal_paths
)
if docs is None:
return
with self._handler() as env:
with env.begin(write=True) as transaction:
for d in docs.traverse_flat(traversal_paths):
transaction.put(d.id.encode(), d.SerializeToString())
@requests(on='/search')
def search(self, docs: DocumentArray, parameters: Dict, **kwargs):
traversal_paths = parameters.get(
'traversal_paths', self.default_traversal_paths
)
if docs is None:
return
docs_to_get = docs.traverse_flat(traversal_paths)
with self._handler() as env:
with env.begin(write=True) as transaction:
for i, d in enumerate(docs_to_get):
id = d.id
serialized_doc = Document(transaction.get(d.id.encode()))
d.update(serialized_doc)
d.id = id
SimpleRanker
You might think why do we need a ranker at all ?
Actually, a ranker is needed because we will be matching small query images against chunks of parent documents. But how can we get back to parent documents (aka full images) given the chunks ? And what if 2 chunks belonging to the same parent are matched ? We can solve this by aggregating the similarity scores of chunks that belong to the same parent (using an aggregation method, in our case, will be the min
value). So, for each query document, we perform the following:
We create an empty collection of parent scores. This collection will store, for each parent, a list of scores of its chunk documents.
For each match, since it’s a chunk document, we can retrieve its
parent_id
. And it’s also a match document so we get its match score and add that value to the parent scores collection.After processing all matches, we need to aggregate the scores of each parent using the
min
metric.Finally, using the aggregated score values of parents, we can create a new list of matches (this time consisting of parents, not chunks). We also need to sort the matches list by aggregated scores.
When query documents exit the SimpleRanker, they now have matches consisting of parent documents. However, parent documents just have IDs. That’s why, during the previous steps, we created LMDBStorage: to actually retrieve parent documents by IDs and fill them with data.
from collections import defaultdict
from typing import Dict, Iterable, Optional
from jina import Document, DocumentArray, Executor, requests
class SimpleRanker(Executor):
def __init__(
self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metric = 'cosine'
@requests(on='/search')
def rank(
self, docs: Optional[DocumentArray] = None, parameters: Dict = {}, **kwargs
):
if docs is None:
return
for doc in docs:
parents_scores = defaultdict(list)
for m in DocumentArray([doc]).traverse_flat(['m']):
parents_scores[m.parent_id].append(m.scores[self.metric].value)
# Aggregate match scores for parent document and
# create doc's match based on parent document of matched chunks
new_matches = []
for match_parent_id, scores in parents_scores.items():
score = min(scores)
new_matches.append(
Document(id=match_parent_id, scores={self.metric: score})
)
# Sort the matches
doc.matches = new_matches
doc.matches.sort(key=lambda d: d.scores[self.metric].value)
Building Flows
Indexing
Now, after creating executors, it’s time to use them in order to build an index Flow and index our data.
Building the index Flow
We create a Flow object and add executors one after the other with the right parameters:
YoloV5Segmenter: We should also specify the device
CLIPImageEncoder: It also receives the device parameter. And since we only encode the chunks, we specify
'traversal_paths': ['c']
SimpleIndexer: We need to specify the workspace parameter
LMDBStorage: We also need to specify the workspace parameter. Furthermore, the executor can run in parallel to the other branch. We can achieve this using
needs='gateway'
. Finally, we setdefault_traversal_paths
to['r']
A final executor which just waits for both branches.
After building the index Flow, we can plot it to verify that we’re using the correct architecture.
from jina import Flow
index_flow = Flow().add(uses=YoloV5Segmenter, name='segmenter', uses_with={'device': device}) \
.add(uses=CLIPImageEncoder, name='encoder', uses_with={'device': device, 'traversal_paths': ['c']}) \
.add(uses=SimpleIndexer, name='chunks_indexer', workspace='workspace') \
.add(uses=LMDBStorage, name='root_indexer', workspace='workspace', needs='gateway', uses_with={'default_traversal_paths': ['r']}) \
.add(name='wait_both', needs=['root_indexer', 'chunks_indexer'])
index_flow.plot()
Now it’s time to index the dataset that we have downloaded. Actually, we will index images inside the images
folder. This helper function will convert image files into Jina Documents and yield them:
from glob import glob
from jina import Document
def input_generator():
for filename in glob('images/*.jpg'):
doc = Document(uri=filename, tags={'filename': filename})
doc.load_uri_to_image_blob()
yield doc
The final step in this section is to send the input documents to the index Flow. Note that indexing can take a while:
with index_flow:
input_docs = input_generator()
index_flow.post(on='/index', inputs=input_docs, show_progress=True)
Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
Using cache found in /root/.cache/torch/hub/ultralytics_yolov5_master
⠏ 4/6 waiting segmenter encoder to be ready...YOLOv5 🚀 2021-10-29 torch 1.9.0+cu111 CPU
⠋ 4/6 waiting segmenter encoder to be ready...Fusing layers...
⠼ 4/6 waiting segmenter encoder to be ready...Model Summary: 213 layers, 7225885 parameters, 0 gradients
Adding AutoShape...
[email protected][I]:🎉 Flow is ready to use!
🔗 Protocol: GRPC
🏠 Local access: 0.0.0.0:44619
🔒 Private network: 172.28.0.2:44619
🌐 Public address: 34.73.118.227:44619
⠦ DONE ━━╸━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0:01:11 0.0 step/s 2 steps done in 1 minute and 11 seconds
Searching:
Now, let’s build the search Flow and use it in order to find sample query images.
Our Flow contains the following executors:
CLIPImageEncoder: It receives the device parameter. This time, since we want to encode root query documents, we specify that
'traversal_paths': ['r']
SimpleIndexer: We need to specify the workspace parameter
SimpleRanker
LMDBStorage: First we specify the workspace parameter. Then we need to use different traversal paths. This time we will be traversing matches:
'default_traversal_paths': ['m']
from jina import Flow
device = 'cpu'
query_flow = Flow().add(uses=CLIPImageEncoder, name='encoder', uses_with={'device': device, 'traversal_paths': ['r']}) \
.add(uses=SimpleIndexer, name='chunks_indexer', workspace='workspace') \
.add(uses=SimpleRanker, name='ranker') \
.add(uses=LMDBStorage, workspace='workspace', name='root_indexer', uses_with={'default_traversal_paths': ['m']})
Let’s plot our Flow
query_flow.plot()
Finally, we can start querying. We will use images inside the query
folder. For each image, we will create a Jina Document. Then we send our documents to the query Flow and receive the response.
For each query document, we can print the image and its top 3 search results
import glob
with query_flow:
docs = [Document(uri=filename) for filename in glob.glob('query/*.jpg')]
for doc in docs:
doc.load_uri_to_image_blob()
resp = query_flow.post('/search', docs, return_results=True)
for doc in resp[0].docs:
print('query:')
plt.imshow(doc.blob)
plt.show()
print('results:')
show_docs(doc.matches)
Sample results:
query:
results:
Congratulations !
The approach that we’ve adopted could effectively match the small bird image against bigger images containing birds.
Again, the full source code of this tutorial is available in this google colab notebook.
Feel free to try it !