- info@krescitus.com
- Mon - Sat: 8.00 am - 7.00 pm
We are creative, ambitious and ready for challenges! Hire Us
FETCH SIMILAR IMAGE FROM S3 (USING AMAZON AURORA)
Requisites –
STEP-1
After creating the s3 bucket, proceed to create Amazon Aurora PostgreSQL .
We selected Aurora I/O-Optimized, which provides improved performance with predictable pricing for I/O-intensive applications.
We opted to use Amazon Aurora Serverless v2, which automatically scales your compute based on your application workload, so you only pay based on the capacity used.
Shown in the image below->
STEP 2 – Set up Postgres
If you face the error shown in image below
Then go to the db cluster of Amazon Aurora and inside “credential management “ select
“Self Managed” and give the password , save it and use the same password in Postgres , now it will connect successfully. (shown in image below)
After Successfully connected, then create a db then create a table(in Postgres). Create 3 columns one for image key,one for image url and one to store the embedded vector(it should be type ‘text’).
STEP-3 – How To Embed Image
For embedding use Amazon Bedrock
Now click on ‘Base Models’ under ‘Foundation Models’ , scroll down to “Titan Multimodal Embeddings G1” and copy the Model ID to use it in lambda functions for image embedding.
STEP-4 – Write Lambda functions.
import json
import boto3
import base64
import os
# Initialize the S3 client
s3 = boto3.client(‘s3’)
# Initialize the Bedrock runtime client
bedrock_runtime = boto3.client(‘bedrock-runtime’)
# Initialize the RDS Data API client
rds_data = boto3.client(‘rds-data’)
# Database connection parameters from environment variables
DB_CLUSTER_ARN = os.environ[‘DB_CLUSTER_ARN’]
DB_SECRET_ARN = os.environ[‘DB_SECRET_ARN’]
DB_NAME = os.environ[‘DB_NAME’]
def get_image_embedding(image_data):
input_image = base64.b64encode(image_data).decode(‘utf-8’)
body = {
“inputImage”: input_image
}
response = bedrock_runtime.invoke_model(
body=json.dumps(body),
modelId=”amazon.titan-embed-image-v1″,
accept=”application/json”,
contentType=”application/json”
)
response_body = json.loads(response[‘body’].read().decode(‘utf-8′))
embedding = response_body.get(’embedding’)
if embedding is None:
raise Exception(f”Failed to get embedding: {response_body}”)
return embedding
def store_embedding_in_db(embedding, image_key):
embedding_vector_json = json.dumps(embedding)
sql = “””
INSERT INTO image_embeddings (image_key, image_embedding)
VALUES (:image_key, :image_embedding)
“””
response = rds_data.execute_statement(
secretArn=DB_SECRET_ARN,
resourceArn=DB_CLUSTER_ARN,
database=DB_NAME,
sql=sql,
parameters=[
{‘name’: ‘image_key’, ‘value’: {‘stringValue’: image_key}},
{‘name’: ‘image_embedding’, ‘value’: {‘stringValue’: embedding_vector_json}}
]
)
return response
def image_exists_in_db(image_key):
sql = “””
SELECT COUNT(*) AS count FROM image_embeddings WHERE image_key = :image_key
“””
response = rds_data.execute_statement(
secretArn=DB_SECRET_ARN,
resourceArn=DB_CLUSTER_ARN,
database=DB_NAME,
sql=sql,
parameters=[
{‘name’: ‘image_key’, ‘value’: {‘stringValue’: image_key}}
]
)
count = response[‘records’][0][0][‘longValue’]
return count > 0
def lambda_handler(event, context):
try:
print(f”Event received: {event}”)
# List all objects in the S3 bucket
bucket_name = ‘finding-similar-images’ # Replace with your S3 bucket name
response = s3.list_objects_v2(Bucket=bucket_name)
if ‘Contents’ not in response:
return {
‘statusCode’: 200,
‘body’: json.dumps({‘message’: ‘No images found in S3 bucket’})
}
for obj in response[‘Contents’]:
key = obj[‘Key’]
print(f”Processing image key: {key}”)
if not image_exists_in_db(key):
print(f”Image key {key} not found in database, generating embedding…”)
# Download the image from S3
s3_response = s3.get_object(Bucket=bucket_name, Key=key)
image_data = s3_response[‘Body’].read()
# Get the image embedding from Amazon Bedrock
embedding = get_image_embedding(image_data)
# Store the embedding in the database
store_embedding_in_db(embedding, key)
return {
‘statusCode’: 200,
‘body’: json.dumps({‘message’: ‘Embeddings updated successfully’})
}
except Exception as e:
print(f”Error processing request: {str(e)}”)
return {
‘statusCode’: 500,
‘body’: json.dumps({‘message’: ‘Error processing request’, ‘error’: str(e)})
}
The Code –
import json
import boto3
import base64
import os
# Initialize the S3 client
s3 = boto3.client(‘s3’)
# Initialize the Bedrock runtime client
bedrock_runtime = boto3.client(‘bedrock-runtime’)
# Initialize the RDS Data API client
rds_data = boto3.client(‘rds-data’)
# Database connection parameters from environment variables
DB_CLUSTER_ARN = os.environ[‘DB_CLUSTER_ARN’]
DB_SECRET_ARN = os.environ[‘DB_SECRET_ARN’]
DB_NAME = os.environ[‘DB_NAME’]
def get_image_embedding(image_data):
# Prepare the payload for Bedrock API
input_image = base64.b64encode(image_data).decode(‘utf-8’)
body = {
“inputImage”: input_image
}
# Invoke the Bedrock model
response = bedrock_runtime.invoke_model(
body=json.dumps(body),
modelId=”amazon.titan-embed-image-v1″, # Replace with your model ID
accept=”application/json”,
contentType=”application/json”
)
# Read the response and extract the embedding
response_body = json.loads(response[‘body’].read().decode(‘utf-8′))
embedding = response_body[’embedding’]
if embedding is None:
raise Exception(f”Failed to get embedding: {response_body}”)
return embedding
def store_embedding_in_db(embedding, image_key, image_url):
# Convert embedding list to JSON string
embedding_vector_json = json.dumps(embedding)
# SQL statement to insert the data
sql = “””
INSERT INTO image_embeddings (image_key, image_embedding, image_url)
VALUES (:image_key, :image_embedding, :image_url)
“””
# Execute the SQL statement using the RDS Data API
response = rds_data.execute_statement(
secretArn=DB_SECRET_ARN,
resourceArn=DB_CLUSTER_ARN,
database=DB_NAME,
sql=sql,
parameters=[
{‘name’: ‘image_key’, ‘value’: {‘stringValue’: image_key}},
{‘name’: ‘image_embedding’, ‘value’: {‘stringValue’: embedding_vector_json}},
{‘name’: ‘image_url’, ‘value’: {‘stringValue’: image_url}}
]
)
print(f”Response from RDS Data API: {json.dumps(response, indent=2)}”)
return response
def lambda_handler(event, context):
try:
print(f”Event image data rudra: {event}”)
# Extracting the body from the event
body = json.loads(event[‘body’])
# Extracting the file name and base64 content from the body
file_name = body[‘fileName’]
file_content = body[‘fileContent’]
# Decoding the base64 content
decoded_file_content = base64.b64decode(file_content)
# Specify the S3 bucket name
bucket_name = ‘finding-similar-images’
# Upload the decoded file to S3
s3.put_object(
Bucket=bucket_name,
Key=file_name,
Body=decoded_file_content,
ContentType=’image/jpeg’
)
# Generate a pre-signed URL for the uploaded image
presigned_url = s3.generate_presigned_url(‘get_object’,
Params={‘Bucket’: bucket_name, ‘Key’: file_name},
ExpiresIn=3600)
# Get the image embedding from Amazon Bedrock
embedding = get_image_embedding(decoded_file_content)
print(f”Image embedding: {embedding}”)
# Store the embedding and URL in the database
store_embedding_in_db(embedding, file_name, presigned_url)
return {
‘statusCode’: 200,
‘body’: json.dumps({
‘message’: ‘Image uploaded and embedding stored successfully’,
’embedding’: embedding,
‘imageUrl’: presigned_url
})
}
except Exception as e:
print(e)
return {
‘statusCode’: 500,
‘body’: json.dumps({‘message’: ‘Failed to upload image and store embedding’, ‘error’: str(e)})
}
import json
import boto3
import base64
import os
import math
# Initialize the S3 client
s3 = boto3.client(‘s3’)
# Initialize the Bedrock runtime client
bedrock_runtime = boto3.client(‘bedrock-runtime’)
# Initialize the RDS Data API client
rds_data = boto3.client(‘rds-data’)
# Database connection parameters from environment variables
DB_CLUSTER_ARN = os.environ[‘DB_CLUSTER_ARN’]
DB_SECRET_ARN = os.environ[‘DB_SECRET_ARN’]
DB_NAME = os.environ[‘DB_NAME’]
S3_BUCKET_URL = os.environ[‘S3_BUCKET_URL’]
def calculate_cosine_similarity(vec1, vec2):
# Compute the dot product
dot_product = sum(v1 * v2 for v1, v2 in zip(vec1, vec2))
# Compute the magnitudes
magnitude1 = math.sqrt(sum(v1 * v1 for v1 in vec1))
magnitude2 = math.sqrt(sum(v2 * v2 for v2 in vec2))
if magnitude1 == 0 or magnitude2 == 0:
return 0 # Avoid division by zero
# Compute cosine similarity
return dot_product / (magnitude1 * magnitude2)
def find_similar_images(embedding, top_n=20):
# SQL query to fetch all embeddings
sql = “SELECT image_key, image_embedding, image_url FROM image_embeddings”
# Execute the SQL statement using the RDS Data API
response = rds_data.execute_statement(
secretArn=DB_SECRET_ARN,
resourceArn=DB_CLUSTER_ARN,
database=DB_NAME,
sql=sql
)
# Calculate similarity for each embedding and sort by similarity
similarities = []
for record in response[‘records’]:
image_key = record[0][‘stringValue’]
image_embedding = json.loads(record[1][‘stringValue’])
image_url = record[2][‘stringValue’]
similarity = calculate_cosine_similarity(embedding, image_embedding)
similarities.append((image_key, similarity, image_url))
# Sort by similarity in descending order
similarities.sort(key=lambda x: x[1], reverse=True)
# Return the top N similar images
return similarities[:top_n]
def lambda_handler(event, context):
try:
print(f”Event received: {event}”)
body = json.loads(event[‘body’])
input_embedding = body.get(’embedding’)
if input_embedding is None:
return {
‘statusCode’: 400,
‘body’: json.dumps({‘message’: ‘Invalid input: embedding is required’})
}
# Find similar images
similar_images = find_similar_images(input_embedding)
similar_images_response = [{‘image_key’: key, ‘similarity’: sim, ‘image_url’: url} for key, sim, url in similar_images]
return {
‘statusCode’: 200,
‘body’: json.dumps({‘similar_images’: similar_images_response})
}
except Exception as e:
print(f”Error processing request: {str(e)}”)
return {
‘statusCode’: 500,
‘body’: json.dumps({‘message’: ‘Error processing request’, ‘error’: str(e)})
}
Now all the setup is done, now you can configure API Gateway and use to invoke lambda functions to store the embedded image and fetch similar images.