|
from typing import Dict, List, Any |
|
import torch |
|
from diffusers import DPMSolverMultistepScheduler, DiffusionPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipelineLegacy |
|
from PIL import Image |
|
import base64 |
|
from io import BytesIO |
|
|
|
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
|
|
self.txt2img_pipe = DiffusionPipeline.from_pretrained(path, torch_dtype=torch.float16) |
|
|
|
self.txt2img_pipe.safety_checker = None |
|
|
|
self.txt2img_pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.txt2img_pipe.scheduler.config) |
|
|
|
self.img2img_pipe = StableDiffusionImg2ImgPipeline( |
|
vae=self.txt2img_pipe.vae, |
|
text_encoder=self.txt2img_pipe.text_encoder, |
|
tokenizer=self.txt2img_pipe.tokenizer, |
|
unet=self.txt2img_pipe.unet, |
|
scheduler=self.txt2img_pipe.scheduler, |
|
safety_checker=self.txt2img_pipe.safety_checker, |
|
feature_extractor=self.txt2img_pipe.feature_extractor, |
|
).to(device) |
|
self.inpaint_pipe = StableDiffusionInpaintPipelineLegacy( |
|
vae=self.txt2img_pipe.vae, |
|
text_encoder=self.txt2img_pipe.text_encoder, |
|
tokenizer=self.txt2img_pipe.tokenizer, |
|
unet=self.txt2img_pipe.unet, |
|
scheduler=self.txt2img_pipe.scheduler, |
|
safety_checker=self.txt2img_pipe.safety_checker, |
|
feature_extractor=self.txt2img_pipe.feature_extractor, |
|
).to(device) |
|
|
|
|
|
def __call__(self, data: Any) -> List[List[Dict[str, float]]]: |
|
""" |
|
:param data: A dictionary contains `inputs` and optional `image` field. |
|
:return: A dictionary with `image` field contains image in base64. |
|
""" |
|
inputs = data.pop("inputs", data) |
|
encoded_image = data.pop("image", None) |
|
encoded_mask_image = data.pop("mask_image", None) |
|
|
|
|
|
num_inference_steps = data.pop("num_inference_steps", 25) |
|
guidance_scale = data.pop("guidance_scale", 7.5) |
|
negative_prompt = data.pop("negative_prompt", None) |
|
height = data.pop("height", 512) |
|
width = data.pop("width", 512) |
|
strength = data.pop("strength", 0.8) |
|
|
|
|
|
if encoded_image is not None and encoded_mask_image is not None: |
|
image = self.decode_base64_image(encoded_image) |
|
mask_image = self.decode_base64_image(encoded_mask_image) |
|
|
|
out = self.inpaint_pipe(inputs, |
|
init_image=image, |
|
mask_image=mask_image, |
|
strength=strength, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=1, |
|
negative_prompt=negative_prompt |
|
) |
|
return out.images[0] |
|
|
|
elif encoded_image is not None: |
|
image = self.decode_base64_image(encoded_image) |
|
|
|
out = self.img2img_pipe(inputs, |
|
init_image=image, |
|
strength=strength, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=1, |
|
negative_prompt=negative_prompt |
|
) |
|
return out.images[0] |
|
else: |
|
out = self.txt2img_pipe(inputs, |
|
num_inference_steps=num_inference_steps, |
|
guidance_scale=guidance_scale, |
|
num_images_per_prompt=1, |
|
negative_prompt=negative_prompt, |
|
height=height, |
|
width=width |
|
) |
|
|
|
|
|
return out.images[0] |
|
|
|
|
|
def decode_base64_image(self, image_string): |
|
base64_image = base64.b64decode(image_string) |
|
buffer = BytesIO(base64_image) |
|
image = Image.open(buffer) |
|
return image |