Commit 1e488362 authored by Vladyslav Yazykov's avatar Vladyslav Yazykov
Browse files

Removed unneeded files

parent f89ab21e
"""
Engine built on top of precomputed local descriptors.
"""
from __future__ import annotations
import time
import os.path
import json
import traceback
from dataclasses import dataclass
import h5py
import numpy as np
from flask import Flask, request, jsonify
from werkzeug.exceptions import HTTPException
from utilities.path2id import path2id
from data_loading.read_invfile import *
from image_search import unconstrained_search, zoom_in, zoom_out
# Config
DATASETS = {
# Dataset
"bow_600k": {
# Engine
# - when in the singularity container, the path 'horn:/local/vrg3_demo/vrg3_demo/data' is mapped to '/vrg3/data'
"zoom_in": "/local/vrg3_demo/vrg3_demo/data/bow_600k/files/invfile.dat"
},
}
# Input data loading: inverted file
ENGINE = load_invfiles(DATASETS)
# API endpoint
app = Flask(__name__)
def _parse_request(request):
if request.content_type == "application/json":
data = request.get_json()
else:
data = {'data': json.loads(request.form['data'])} if 'data' in request.form else {}
data = data['data']
assert data['dataset_name'] in ENGINE, "Unsupported dataset"
assert data['engine_name'] in ENGINE[data['dataset_name']], "Unsupported network"
return data
@app.route("/capabilities", methods=['POST'])
def capabilities():
data = {"data": {
"search_types": ["image"],
"engine_options": [
{"id": "mode", "label": "Search mode",
"type": "option", "values": ["Unconstrained", "Zoom-in", "Zoom-out"],
"default": "Unconstrained"},
],
"images_overlays": [
{"id": "rank", "label": "Rank", "type": "text", "default": True},
{"id": "name", "label": "Name", "type": "text", "default": True},
{"id": "bbxs", "label": "Bounding box", "type": "rectangles", "default": True},
],
}}
return jsonify(data), 200
@app.route("/images", methods=['POST'])
def images():
time0 = time.time()
data = _parse_request(request)
dataset = ENGINE[data["dataset_name"]]
mode = None
if not data.get("query", None):
# Browsing functionality
# image paths from the chosen dataset
images = next(iter(dataset.values())).paths
np.random.seed(seed=0)
# Show images in a random order
ranks = np.random.permutation(np.arange(len(images)))
mode = "browsing"
bbxs = [[] for i in range(len(ranks))]
elif data["query"]["type"] == "image":
# Search-by-image functionality
assert data["query"]["value"]["prefix"] == data["dataset_name"]
engine = dataset[data["engine_name"]]
images = engine.paths
query_id = path2id(data["query"]["value"]["path"], engine.cid2id)
#raise ValueError(query_id)
if data["engine_options"]["mode"] == "Unconstrained":
# Do unconstrained search
ranks = unconstrained_search(query_id, engine)
bbxs = [[] for i in range(len(ranks))]
elif data["engine_options"]["mode"] == "Zoom-in":
# Do zoom-in search
ranks, bbxs = zoom_in(query_id, engine)
elif data["engine_options"]["mode"] == "Zoom-out":
# Do zoom-out search
ranks, bbxs = zoom_out(query_id, engine)
else:
# Unknown search mode
raise ValueError("Unknown search mode")
# Remove query from results
ranks = ranks[1:]
bbxs = bbxs[1:]
mode = "image"
else:
raise ValueError("Unknown query type")
# Output building
slice = [images[i] for i in ranks[data['offset']:data['offset']+data['limit']]] # Paging
parse_name = lambda x: os.path.splitext(os.path.basename(x))[0]
outdata = {
"results": len(images),
"images": [{"prefix": data['dataset_name'],
"path": x,
"overlays": {
"rank": data['offset']+i+1,
"name": parse_name(x),
"bbxs": bbxs[i],
}} for i, x in enumerate(slice)],
}
if mode == "image":
outdata["query_text"] = "%s results in %.3fs (engine_options: %s)" % (len(images), time.time() - time0, data["engine_options"])
outdata["query_image"] = {"overlays": {"name": parse_name(data["query"]["value"]["path"])}}
return jsonify({"data": outdata}), 200
# Error handling
@app.errorhandler(HTTPException)
def handle_exception(e):
"""Return JSON instead of HTML for HTTP errors."""
response = e.get_response()
response.data = json.dumps({
"error": e.name,
"message": e.description,
})
response.content_type = "application/json"
return response
@app.errorhandler(Exception)
def handle_exception(e):
# pass through HTTP errors
if isinstance(e, HTTPException):
return e
# now you're handling non-HTTP exceptions only
traceback.print_exc()
return jsonify({"error": repr(e)}), 500
......@@ -12,7 +12,7 @@ datasets:
max_tc: 600
max_MxN: 10
max_spatial: 50
inlier_threshold: 5
inlier_threshold: 8
minimum_score: 0.0
use_query_expansion: True
max_qe_keypoints: 1500
......
#from .image_search import *
#from .spatial_verification import *
#from .utils import *
from .search_methods import *
from .search_methods import search_methods
import numpy as np
def get_A_matrix_from_geom(geom: np.ndarray):
"""
returns a matrix
[
a11 a12 x
a21 a22 y
0 0 1
]
:param geom:
:return: 3x3 matrix
"""
x = geom[0]
y = geom[1]
a = geom[2]
b = geom[3]
c = geom[4]
return np.array([
[a, 0, x],
[b, c, y],
[0, 0, 1],
])
import numpy as np
from engine import Engine
def get_tentative_correspondences(lbl: np.ndarray, idxs: np.ndarray, engine: Engine) -> np.ndarray:
"""
Compute tentative correspondences.
:return: correspondences np.ndarray of arrays of shape [num_correspondences x 2]
"""
max_tc = engine.options.max_tc
max_MxN = engine.options.max_MxN
correspondences = []
lbl = lbl.flatten()
num_keypoints_in_query = lbl.shape[0]
for idx in idxs:
vw_rel = engine.get_vw_in_image(idx)
num_words_in_img = len(vw_rel)
C = np.repeat([vw_rel], num_keypoints_in_query, axis=0) == np.repeat([lbl], num_words_in_img, axis=0).T
MxN = np.outer(C.sum(1), C.sum(0))
C = C * MxN
tentative_correspondences = np.where((C > 0) & (C <= max_MxN))
MxN = MxN[tentative_correspondences]
argsorted_MxN = np.argsort(MxN)
tentative_correspondences = np.array(tentative_correspondences).T
tentative_correspondences = tentative_correspondences[argsorted_MxN]
if len(tentative_correspondences) > max_tc:
tentative_correspondences = tentative_correspondences[:max_tc]
correspondences.append(tentative_correspondences)
return np.array(correspondences)
import random
import numpy as np
from scipy.sparse import csr_matrix
def kmeans(num_clusters: int, data: np.ndarray, max_num_iterations: int = 1000) -> (np.ndarray, float):
"""
Cluster data [num_points, num_dims] into num_clusters, using K-Means algorithm.
If not converged yet, stop after max_num_iterations.
clusters: [num_clusters, num_dims]
:param num_clusters:
:param data:
:param max_num_iterations:
:return: (clusters, distances_sum)
"""
centroids = data[random.sample(range(data.shape[0]), num_clusters)]
assigned_centroids = np.zeros(centroids.shape)
for i in range(max_num_iterations):
centers, d = nearest(centroids, data)
for c in range(num_clusters):
cluster_members = data[centers == c]
if len(cluster_members) != 0:
assigned_centroids[c] = cluster_members.mean(axis=0)
else:
assigned_centroids[c] = data[random.sample(range(data.shape[0]), 1)]
if np.allclose(centroids, assigned_centroids):
break
centroids[:, :] = assigned_centroids[:, :]
dist = np.linalg.norm(centroids[centers] - data, axis=1)
return centroids, dist.sum()
def nearest(means: np.ndarray, data: np.ndarray) -> (np.ndarray, np.ndarray):
"""
For each data-point in data [num_points, num_dims] find nearest mean [num_means, num_dims].
Return indices [num_points, ] and distances [num_points, ] to the nearest mean for each data-point.
:param means:
:param data:
:return: (idxs, distances)
"""
dist = np.linalg.norm(data[:, np.newaxis, :] - means[np.newaxis, :, :], axis=2)
means = dist.argmin(axis=1)
idx = list(range(means.size))
return means, dist[idx, means]
def create_db(imgs_visual_words: np.ndarray, num_words: int) -> csr_matrix:
"""
Create database [num_words, num_imgs] of word weights represented as csr_matrix. Details explained at tutorial page.
imgs_visual_words is an array of arrays of length num_imgs. Each np.array is a list of visual words in an image.
Number of visual_words_in_img differs for each img.
:param imgs_visual_words:
:param num_words:
:return: db
"""
num_imgs = imgs_visual_words.shape[0]
db = csr_matrix(np.zeros((num_words, num_imgs)))
i = 0
for words in imgs_visual_words:
w, counts = np.unique(words, return_counts=True)
idxs = i * np.ones(w.shape)
db[w, idxs] += csr_matrix(counts / np.linalg.norm(counts))
i += 1
return db
def create_db_tfidf(imgs_visual_words: np.ndarray, num_words: int, idf: np.ndarray) -> csr_matrix:
"""
Create database [num_words, num_imgs] of word weights represented as csr_matrix. Details explained at tutorial page.
imgs_visual_words is of dimensions [num_imgs, visual_words_in_img]. Number of visual_words_in_img differs for
each img.
idf - Inverse Document Frequency. Shape: [num_words, 1]
:param imgs_visual_words:
:param num_words:
:param idf:
:return: df
"""
num_imgs = imgs_visual_words.shape[0]
db = csr_matrix(np.zeros((num_words, num_imgs)))
i = 0
for words in imgs_visual_words:
w, counts = np.unique(words, return_counts=True)
idxs = i * np.ones(w.shape)
new_counts = idf[w, 0] * counts
db[w, idxs] += csr_matrix(new_counts / np.linalg.norm(new_counts))
i += 1
return db
def get_idf(imgs_visual_words: np.ndarray, num_words: int) -> np.ndarray:
"""
Create Inverse Document Frequency of shape: [num_words, num_imgs]
imgs_visual_words is an array of arrays of length num_imgs. Each np.array is a list of visual words in an image.
Number of visual_words_in_img differs for each img.
idf is of shape: [num_words, 1]
:param imgs_visual_words:
:param num_words:
:return: idf
"""
D = imgs_visual_words.shape[0]
number_doc = np.zeros((num_words, 1))
for words in imgs_visual_words:
number_doc[np.unique(words)] += 1
idf = np.log(D / np.maximum(number_doc, 1e-10))
idf[number_doc == 0] = 0
return idf
def create_query(query, num_words, idf):
w, counts = np.unique(query, return_counts=True)
new_counts = idf[w, 0] * counts
idxs = np.zeros(w.shape)
q = csr_matrix(np.zeros((1, num_words)))
q[idxs, w] += csr_matrix(new_counts / np.linalg.norm(new_counts))
return q
import numpy as np
import math
import torch
import torch.nn.functional as F
import typing
def get_gausskernel_size(sigma, force_odd = True):
ksize = 2 * math.ceil(sigma * 3.0) + 1
if ksize % 2 == 0 and force_odd:
ksize +=1
return int(ksize)
def gaussian1d(x: torch.Tensor, sigma: float) -> torch.Tensor:
'''Function that computes values of a (1D) Gaussian with zero mean and variance sigma^2'''
coef = 1./ (math.sqrt(2.0*math.pi)*sigma)
out = coef*torch.exp(-(x**2)/(2.0*sigma**2))
return out
def gaussian_deriv1d(x: torch.Tensor, sigma: float) -> torch.Tensor:
'''Function that computes values of a (1D) Gaussian derivative'''
out = -x * gaussian1d(x, sigma) / (sigma**2)
out = out - out.mean()
return out
def gaussian_deriv1d_2nd(x: torch.Tensor, sigma: float) -> torch.Tensor:
'''Function that computes values of a (1D) Gaussian derivative'''
out = gaussian1d(x, sigma) * (x * x - sigma**2) / (sigma**4)
out = out - out.mean()
return out
def filter2d(x: torch.Tensor, kernel: torch.Tensor) -> torch.Tensor:
"""Function that convolves a tensor with a kernel.
The function applies a given kernel to a tensor. The kernel is applied
independently at each depth channel of the tensor. Before applying the
kernel, the function applies padding according to the specified mode so
that the output remains in the same shape.
Args:
input (torch.Tensor): the input tensor with shape of
:math:`(B, C, H, W)`.
kernel (torch.Tensor): the kernel to be convolved with the input
tensor. The kernel shape must be :math:`(kH, kW)`.
Return:
torch.Tensor: the convolved tensor of same size and numbers of channels
as the input.
"""
b, c, h, w = x.shape
height, width = kernel.size()
tmp_kernel: torch.Tensor = kernel[None,None,...].to(x.device).to(x.dtype)
padding_shape: List[int] = [width // 2, width // 2, height // 2, height // 2]
input_pad: torch.Tensor = F.pad(x, padding_shape, mode='replicate')
out = F.conv2d(input_pad, tmp_kernel.expand(c, -1, -1, -1),
groups=c,
padding=0,
stride=1,
)
return out
def gaussian_filter2d(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Function that blurs a tensor using a Gaussian filter.
Arguments:
sigma (Tuple[float, float]): the standard deviation of the kernel.
Returns:
Tensor: the blurred tensor.
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, H, W)`
"""
ksize = get_gausskernel_size(sigma)
kernel_inp = torch.linspace(-float(ksize//2), float(ksize//2), ksize)
kernel1d = gaussian1d(kernel_inp, sigma).reshape(1,-1)
outx = filter2d(x, kernel1d)
out = filter2d(outx, kernel1d.t())
return out
def spatial_gradient_first_order(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Computes the first order image derivative in both x and y directions using Gaussian derivative
Return:
torch.Tensor: spatial gradients
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, 2, H, W)`
"""
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
kernel_inp = torch.linspace(-float(ksize//2), float(ksize//2), ksize)
gx = gaussian_deriv1d(kernel_inp, sigma).reshape(1,-1)
g = gaussian1d(kernel_inp, sigma).reshape(1,-1)
kernel_x = torch.mm(g.t(),gx)
outx = filter2d(x, kernel_x)
outy = filter2d(x, kernel_x.t())
out = torch.cat([outx.unsqueeze(2), outy.unsqueeze(2)], dim=2)
return out
def spatial_gradient_first_order_fast(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Computes the first order image derivative in both x and y directions using Gaussian derivative
Return:
torch.Tensor: spatial gradients
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, 2, H, W)`
"""
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
kernel_inp = torch.linspace(-float(ksize//2), float(ksize//2), ksize)
filtered_input = gaussian_filter2d(x, sigma)
gx = torch.tensor([[0.5, 0, -0.5]]).float()#.view(1,-1)
#g = gaussian1d(kernel_inp, sigma).reshape(1,-1)
#kernel_x = torch.mm(g.t(),gx)
outx = filter2d(filtered_input, gx)
outy = filter2d(filtered_input, gx.t())
out = torch.cat([outx.unsqueeze(2), outy.unsqueeze(2)], dim=2)
return out
def spatial_gradient_second_order(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Computes the second order image derivative in xx, xy, yy directions using Gaussian derivative
Return:
torch.Tensor: spatial gradients
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, 3, H, W)`
"""
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
gx = torch.tensor([[0.5, 0, -0.5]]).to(x.device, x.dtype)
first_order = spatial_gradient_first_order(x, sigma)
out = torch.cat([filter2d(first_order[:,:,0], gx).unsqueeze(2),
filter2d(first_order[:,:,0], gx.t()).unsqueeze(2),
filter2d(first_order[:,:,1], gx.t()).unsqueeze(2)], dim=2)
return out
def spatial_gradient_second_order2(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Computes the second order image derivative in xx, xy, yy directions using Gaussian derivative
Return:
torch.Tensor: spatial gradients
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, 3, H, W)`
"""
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
kernel_inp = torch.linspace(-float(ksize//2), float(ksize//2), ksize)
gx = gaussian_deriv1d(kernel_inp, sigma).reshape(1,-1)
gx2 = gaussian_deriv1d_2nd(kernel_inp, sigma).reshape(1,-1)
g = gaussian1d(kernel_inp, sigma).reshape(1,-1)
kernel_xy = torch.mm(gx.t().double(), gx.double()).float()
kernel_x2 = torch.mm(gx2.double().t(), g.double()).t().float()
out = torch.cat([filter2d(x, kernel_x2).unsqueeze(2),
filter2d(x, kernel_xy).unsqueeze(2),
filter2d(x, kernel_x2.t()).unsqueeze(2)], dim=2)
return out
def spatial_gradient_second_order3(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Computes the second order image derivative in xx, xy, yy directions using Gaussian derivative
Return:
torch.Tensor: spatial gradients
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, 3, H, W)`
"""
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
kernel_inp = torch.linspace(-float(ksize//2), float(ksize//2), ksize)
g = gaussian1d(kernel_inp, sigma).reshape(1,-1)
gx = gaussian_deriv1d(kernel_inp, sigma).reshape(1,-1)
gxx = gaussian_deriv1d_2nd(kernel_inp, sigma).reshape(1,-1)
out = torch.cat([filter2d(filter2d(x, g.t()),gxx).unsqueeze(2),
filter2d(filter2d(x, gx), gx.t()).unsqueeze(2),
filter2d(filter2d(x, g),gxx.t()).unsqueeze(2)], dim=2)
return out
def spatial_gradient_second_order_fast(x: torch.Tensor, sigma: float) -> torch.Tensor:
r"""Computes the second order image derivative in xx, xy, yy directions using Gaussian derivative
Return:
torch.Tensor: spatial gradients
Shape:
- Input: :math:`(B, C, H, W)`
- Output: :math:`(B, C, 3, H, W)`
"""
b, c, h, w = x.shape
ksize = get_gausskernel_size(sigma)
gx = torch.tensor([[0.5, 0, -0.5]]).to(x.device, x.dtype)
first_order = spatial_gradient_first_order_fast(x, sigma)
out = torch.cat([filter2d(first_order[:,:,0], gx).unsqueeze(2),
filter2d(first_order[:,:,0], gx.t()).unsqueeze(2),
filter2d(first_order[:,:,1], gx.t()).unsqueeze(2)], dim=2)
return out