Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import base64 | |
import json | |
import mimetypes | |
import os | |
import uuid | |
from io import BytesIO | |
from typing import Optional | |
import requests | |
from dotenv import load_dotenv | |
from huggingface_hub import InferenceClient | |
from PIL import Image | |
from transformers import AutoProcessor | |
from smolagents import Tool, tool | |
load_dotenv(override=True) | |
idefics_processor = AutoProcessor.from_pretrained("HuggingFaceM4/idefics2-8b-chatty") | |
def process_images_and_text(image_path, query, client): | |
messages = [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "image"}, | |
{"type": "text", "text": query}, | |
], | |
}, | |
] | |
prompt_with_template = idefics_processor.apply_chat_template(messages, add_generation_prompt=True) | |
# load images from local directory | |
# encode images to strings which can be sent to the endpoint | |
def encode_local_image(image_path): | |
# load image | |
image = Image.open(image_path).convert("RGB") | |
# Convert the image to a base64 string | |
buffer = BytesIO() | |
image.save(buffer, format="JPEG") # Use the appropriate format (e.g., JPEG, PNG) | |
base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8") | |
# add string formatting required by the endpoint | |
image_string = f"data:image/jpeg;base64,{base64_image}" | |
return image_string | |
image_string = encode_local_image(image_path) | |
prompt_with_images = prompt_with_template.replace("<image>", "![]({}) ").format(image_string) | |
payload = { | |
"inputs": prompt_with_images, | |
"parameters": { | |
"return_full_text": False, | |
"max_new_tokens": 200, | |
}, | |
} | |
return json.loads(client.post(json=payload).decode())[0] | |
# Function to encode the image | |
def encode_image(image_path): | |
if image_path.startswith("http"): | |
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0" | |
request_kwargs = { | |
"headers": {"User-Agent": user_agent}, | |
"stream": True, | |
} | |
# Send a HTTP request to the URL | |
response = requests.get(image_path, **request_kwargs) | |
response.raise_for_status() | |
content_type = response.headers.get("content-type", "") | |
extension = mimetypes.guess_extension(content_type) | |
if extension is None: | |
extension = ".download" | |
fname = str(uuid.uuid4()) + extension | |
download_path = os.path.abspath(os.path.join("downloads", fname)) | |
with open(download_path, "wb") as fh: | |
for chunk in response.iter_content(chunk_size=512): | |
fh.write(chunk) | |
image_path = download_path | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode("utf-8") | |
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"} | |
def resize_image(image_path): | |
img = Image.open(image_path) | |
width, height = img.size | |
img = img.resize((int(width / 2), int(height / 2))) | |
new_image_path = f"resized_{image_path}" | |
img.save(new_image_path) | |
return new_image_path | |
class VisualQATool(Tool): | |
name = "visualizer" | |
description = "A tool that can answer questions about attached images." | |
inputs = { | |
"image_path": { | |
"description": "The path to the image on which to answer the question", | |
"type": "string", | |
}, | |
"question": {"description": "the question to answer", "type": "string", "nullable": True}, | |
} | |
output_type = "string" | |
client = InferenceClient("HuggingFaceM4/idefics2-8b-chatty") | |
def forward(self, image_path: str, question: Optional[str] = None) -> str: | |
output = "" | |
add_note = False | |
if not question: | |
add_note = True | |
question = "Please write a detailed caption for this image." | |
try: | |
output = process_images_and_text(image_path, question, self.client) | |
except Exception as e: | |
print(e) | |
if "Payload Too Large" in str(e): | |
new_image_path = resize_image(image_path) | |
output = process_images_and_text(new_image_path, question, self.client) | |
if add_note: | |
output = ( | |
f"You did not provide a particular question, so here is a detailed caption for the image: {output}" | |
) | |
return output | |
def visualizer(image_path: str, question: Optional[str] = None) -> str: | |
"""A tool that can answer questions about attached images. | |
Args: | |
image_path: The path to the image on which to answer the question. This should be a local path to downloaded image. | |
question: The question to answer. | |
""" | |
add_note = False | |
if not question: | |
add_note = True | |
question = "Please write a detailed caption for this image." | |
if not isinstance(image_path, str): | |
raise Exception("You should provide at least `image_path` string argument to this tool!") | |
mime_type, _ = mimetypes.guess_type(image_path) | |
base64_image = encode_image(image_path) | |
payload = { | |
"model": "gpt-4o", | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": question}, | |
{"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}}, | |
], | |
} | |
], | |
"max_tokens": 1000, | |
} | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
try: | |
output = response.json()["choices"][0]["message"]["content"] | |
except Exception: | |
raise Exception(f"Response format unexpected: {response.json()}") | |
if add_note: | |
output = f"You did not provide a particular question, so here is a detailed caption for the image: {output}" | |
return output | |