Over 10 years we help companies reach their financial and branding goals. Engitech is a values-driven technology agency dedicated.

Gallery

Contacts

Bhubaneswar, India

info@krescitus.com

+1 -800-456-478-23

FETCH SIMILAR IMAGE FROM S3 (USING AMAZON AURORA)

 

Requisites – 

  1. S3 bucket. (Create an s3 bucket to store images)
  2. Amazon Aurora PostgreSQL instance
  3. Postgres server
  4. Lambda functions

 

STEP-1

After creating the s3 bucket, proceed to create Amazon Aurora PostgreSQL .

  1. On the Aurora console, create a new cluster.
  2. For Engine options¸ select Aurora (PostgreSQL Compatible).
  3. For Engine version, choose your engine version.
  4. For Configuration options, select either Aurora Standard or Aurora I/O Optimized.

We selected Aurora I/O-Optimized, which provides improved performance with predictable pricing for I/O-intensive applications.

  1. For DB instance class, select your instance class.

We opted to use Amazon Aurora Serverless v2, which automatically scales your compute based on your application workload, so you only pay based on the capacity used.

Shown in the image below->

STEP 2 –  Set up Postgres

  1. Install Postgresql in your system.
  2. Open the pgAdmin4 and click on Add New Server
  1. Give a name to the server
  2. Then click on connection and add the endpoint of the Aurora Cluster instance in the Host name/address input field
  3. Modify the amazon aurora instance and allow Public Access.
  1. Give password and make sure you remember it.
  2. Click on save and it will be connected to the Amazon Aurora Cluster

 

If you face the error shown in image below

 

Then go to the db cluster of Amazon Aurora and inside “credential management “ select

“Self Managed” and give the password , save it and use the same password in Postgres , now it will connect successfully. (shown in image below)

 

After Successfully connected, then create a db then create a table(in Postgres). Create 3 columns one for image key,one for image url and one to store the embedded vector(it should be type ‘text’).

STEP-3 – How To Embed Image

For embedding use Amazon Bedrock

  1. Open Amazon Bedrock, select Titan.
  2. Now click on Request Model Access, then select “Titan Multimodal Embeddings G1”

Now click on ‘Base Models’ under ‘Foundation Models’ , scroll down to “Titan Multimodal Embeddings G1” and copy the Model ID to use it in lambda functions for image embedding.

STEP-4 – Write Lambda functions.

  1. First Write Lambda function to embed  the existing images in your s3 bucket.
    Below is the python code-

import json

import boto3

import base64

import os

 

# Initialize the S3 client

s3 = boto3.client(‘s3’)

 

# Initialize the Bedrock runtime client

bedrock_runtime = boto3.client(‘bedrock-runtime’)

 

# Initialize the RDS Data API client

rds_data = boto3.client(‘rds-data’)

 

# Database connection parameters from environment variables

DB_CLUSTER_ARN = os.environ[‘DB_CLUSTER_ARN’]

DB_SECRET_ARN = os.environ[‘DB_SECRET_ARN’]

DB_NAME = os.environ[‘DB_NAME’]

 

def get_image_embedding(image_data):

input_image = base64.b64encode(image_data).decode(‘utf-8’)

 

body = {

“inputImage”: input_image

}

 

response = bedrock_runtime.invoke_model(

body=json.dumps(body),

modelId=”amazon.titan-embed-image-v1″,

accept=”application/json”,

contentType=”application/json”

)

 

response_body = json.loads(response[‘body’].read().decode(‘utf-8′))

embedding = response_body.get(’embedding’)

 

if embedding is None:

raise Exception(f”Failed to get embedding: {response_body}”)

 

return embedding

 

def store_embedding_in_db(embedding, image_key):

embedding_vector_json = json.dumps(embedding)

 

sql = “””

INSERT INTO image_embeddings (image_key, image_embedding)

VALUES (:image_key, :image_embedding)

“””

 

response = rds_data.execute_statement(

secretArn=DB_SECRET_ARN,

resourceArn=DB_CLUSTER_ARN,

database=DB_NAME,

sql=sql,

parameters=[

{‘name’: ‘image_key’, ‘value’: {‘stringValue’: image_key}},

{‘name’: ‘image_embedding’, ‘value’: {‘stringValue’: embedding_vector_json}}

]

)

return response

 

def image_exists_in_db(image_key):

sql = “””

SELECT COUNT(*) AS count FROM image_embeddings WHERE image_key = :image_key

“””

 

response = rds_data.execute_statement(

secretArn=DB_SECRET_ARN,

resourceArn=DB_CLUSTER_ARN,

database=DB_NAME,

sql=sql,

parameters=[

{‘name’: ‘image_key’, ‘value’: {‘stringValue’: image_key}}

]

)

 

count = response[‘records’][0][0][‘longValue’]

return count > 0

 

def lambda_handler(event, context):

try:

print(f”Event received: {event}”)

 

# List all objects in the S3 bucket

bucket_name = ‘finding-similar-images’  # Replace with your S3 bucket name

response = s3.list_objects_v2(Bucket=bucket_name)

 

if ‘Contents’ not in response:

return {

‘statusCode’: 200,

‘body’: json.dumps({‘message’: ‘No images found in S3 bucket’})

}

 

for obj in response[‘Contents’]:

key = obj[‘Key’]

print(f”Processing image key: {key}”)

 

if not image_exists_in_db(key):

print(f”Image key {key} not found in database, generating embedding…”)

 

# Download the image from S3

s3_response = s3.get_object(Bucket=bucket_name, Key=key)

image_data = s3_response[‘Body’].read()

 

# Get the image embedding from Amazon Bedrock

embedding = get_image_embedding(image_data)

 

# Store the embedding in the database

store_embedding_in_db(embedding, key)

 

return {

‘statusCode’: 200,

‘body’: json.dumps({‘message’: ‘Embeddings updated successfully’})

}

 

except Exception as e:

print(f”Error processing request: {str(e)}”)

return {

‘statusCode’: 500,

‘body’: json.dumps({‘message’: ‘Error processing request’, ‘error’: str(e)})

}

 

  1.  Lambda Function to store image in s3,then embed that image and store the embedded vector in db

The Code –

import json

import boto3

import base64

import os

 

# Initialize the S3 client

s3 = boto3.client(‘s3’)

 

# Initialize the Bedrock runtime client

bedrock_runtime = boto3.client(‘bedrock-runtime’)

 

# Initialize the RDS Data API client

rds_data = boto3.client(‘rds-data’)

 

# Database connection parameters from environment variables

DB_CLUSTER_ARN = os.environ[‘DB_CLUSTER_ARN’]

DB_SECRET_ARN = os.environ[‘DB_SECRET_ARN’]

DB_NAME = os.environ[‘DB_NAME’]

 

def get_image_embedding(image_data):

# Prepare the payload for Bedrock API

input_image = base64.b64encode(image_data).decode(‘utf-8’)

 

body = {

“inputImage”: input_image

}

 

# Invoke the Bedrock model

response = bedrock_runtime.invoke_model(

body=json.dumps(body),

modelId=”amazon.titan-embed-image-v1″,  # Replace with your model ID

accept=”application/json”,

contentType=”application/json”

)

 

# Read the response and extract the embedding

response_body = json.loads(response[‘body’].read().decode(‘utf-8′))

embedding = response_body[’embedding’]

 

if embedding is None:

raise Exception(f”Failed to get embedding: {response_body}”)

 

return embedding

 

def store_embedding_in_db(embedding, image_key, image_url):

# Convert embedding list to JSON string

embedding_vector_json = json.dumps(embedding)

 

# SQL statement to insert the data

sql = “””

INSERT INTO image_embeddings (image_key, image_embedding, image_url)

VALUES (:image_key, :image_embedding, :image_url)

“””

 

# Execute the SQL statement using the RDS Data API

response = rds_data.execute_statement(

secretArn=DB_SECRET_ARN,

resourceArn=DB_CLUSTER_ARN,

database=DB_NAME,

sql=sql,

parameters=[

{‘name’: ‘image_key’, ‘value’: {‘stringValue’: image_key}},

{‘name’: ‘image_embedding’, ‘value’: {‘stringValue’: embedding_vector_json}},

{‘name’: ‘image_url’, ‘value’: {‘stringValue’: image_url}}

]

)

print(f”Response from RDS Data API: {json.dumps(response, indent=2)}”)

return response

 

def lambda_handler(event, context):

try:

print(f”Event image data rudra: {event}”)

 

# Extracting the body from the event

body = json.loads(event[‘body’])

 

# Extracting the file name and base64 content from the body

file_name = body[‘fileName’]

file_content = body[‘fileContent’]

 

# Decoding the base64 content

decoded_file_content = base64.b64decode(file_content)

 

# Specify the S3 bucket name

bucket_name = ‘finding-similar-images’

 

# Upload the decoded file to S3

s3.put_object(

Bucket=bucket_name,

Key=file_name,

Body=decoded_file_content,

ContentType=’image/jpeg’

)

 

# Generate a pre-signed URL for the uploaded image

presigned_url = s3.generate_presigned_url(‘get_object’,

Params={‘Bucket’: bucket_name, ‘Key’: file_name},

ExpiresIn=3600)

 

# Get the image embedding from Amazon Bedrock

embedding = get_image_embedding(decoded_file_content)

 

print(f”Image embedding: {embedding}”)

 

# Store the embedding and URL in the database

store_embedding_in_db(embedding, file_name, presigned_url)

 

return {

‘statusCode’: 200,

‘body’: json.dumps({

‘message’: ‘Image uploaded and embedding stored successfully’,

’embedding’: embedding,

‘imageUrl’: presigned_url

})

}

except Exception as e:

print(e)

return {

‘statusCode’: 500,

‘body’: json.dumps({‘message’: ‘Failed to upload image and store embedding’, ‘error’: str(e)})

}

 

  1. Lambda Function to fetch the similar images when user uploads a image 

import json

import boto3

import base64

import os

import math

 

# Initialize the S3 client

s3 = boto3.client(‘s3’)

 

# Initialize the Bedrock runtime client

bedrock_runtime = boto3.client(‘bedrock-runtime’)

 

# Initialize the RDS Data API client

rds_data = boto3.client(‘rds-data’)

 

# Database connection parameters from environment variables

DB_CLUSTER_ARN = os.environ[‘DB_CLUSTER_ARN’]

DB_SECRET_ARN = os.environ[‘DB_SECRET_ARN’]

DB_NAME = os.environ[‘DB_NAME’]

S3_BUCKET_URL = os.environ[‘S3_BUCKET_URL’]

 

def calculate_cosine_similarity(vec1, vec2):

# Compute the dot product

dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2))

 

# Compute the magnitudes

magnitude1 = math.sqrt(sum(v1 * v1 for v1 in vec1))

magnitude2 = math.sqrt(sum(v2 * v2 for v2 in vec2))

 

if magnitude1 == 0 or magnitude2 == 0:

return 0  # Avoid division by zero

 

# Compute cosine similarity

return dot_product / (magnitude1 * magnitude2)

 

def find_similar_images(embedding, top_n=20):

# SQL query to fetch all embeddings

sql = “SELECT image_key, image_embedding, image_url FROM image_embeddings”

 

# Execute the SQL statement using the RDS Data API

response = rds_data.execute_statement(

secretArn=DB_SECRET_ARN,

resourceArn=DB_CLUSTER_ARN,

database=DB_NAME,

sql=sql

)

 

# Calculate similarity for each embedding and sort by similarity

similarities = []

for record in response[‘records’]:

image_key = record[0][‘stringValue’]

image_embedding = json.loads(record[1][‘stringValue’])

image_url = record[2][‘stringValue’]

similarity = calculate_cosine_similarity(embedding, image_embedding)

similarities.append((image_key, similarity, image_url))

 

# Sort by similarity in descending order

similarities.sort(key=lambda x: x[1], reverse=True)

 

# Return the top N similar images

return similarities[:top_n]

 

def lambda_handler(event, context):

try:

print(f”Event received: {event}”)

 

body = json.loads(event[‘body’])

input_embedding = body.get(’embedding’)

 

if input_embedding is None:

return {

‘statusCode’: 400,

‘body’: json.dumps({‘message’: ‘Invalid input: embedding is required’})

}

 

# Find similar images

similar_images = find_similar_images(input_embedding)

similar_images_response = [{‘image_key’: key, ‘similarity’: sim, ‘image_url’: url} for key, sim, url in similar_images]

 

return {

‘statusCode’: 200,

‘body’: json.dumps({‘similar_images’: similar_images_response})

}

 

except Exception as e:

print(f”Error processing request: {str(e)}”)

return {

‘statusCode’: 500,

‘body’: json.dumps({‘message’: ‘Error processing request’, ‘error’: str(e)})

}

 

Now all the setup is done, now you can configure API Gateway and use to invoke lambda functions to  store the embedded image and fetch similar images.