Last active
January 25, 2026 22:53
-
-
Save soravux/dc840c718dcd4a42e6409d474728d668 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Script to create a circular mask and process images with ComfyUI. | |
| Usage: python add_sphere.py <input_image_path> [input_image_path2] [...] | |
| Dependencies: | |
| pip install websocket-client numpy pillow | |
| """ | |
| import sys | |
| import json | |
| import uuid | |
| import io | |
| import urllib.request | |
| import random | |
| import websocket | |
| import numpy as np | |
| from pathlib import Path | |
| from PIL import Image | |
| # ============================================================ | |
| # CONFIGURABLE PARAMETERS | |
| # ============================================================ | |
| # Mask circle position in image (relative, 0.0 to 1.0) | |
| # (0.5, 0.5) = center of image | |
| MASK_CENTER_X = 0.5 | |
| MASK_CENTER_Y = 0.65 | |
| # Mask circle radius as fraction of the smaller image dimension | |
| MASK_RADIUS_FRACTION = 0.22 | |
| # ComfyUI server address | |
| COMFY_SERVER_ADDRESS = "127.0.0.1:8000" | |
| # ============================================================ | |
| # END CONFIGURABLE PARAMETERS | |
| # ============================================================ | |
| def create_circular_mask(width, height): | |
| """ | |
| Create a circular mask. | |
| Returns mask as numpy array (255 inside circle, 0 outside). | |
| """ | |
| # Calculate circle parameters in pixel coordinates | |
| cx = int(width * MASK_CENTER_X) | |
| cy = int(height * MASK_CENTER_Y) | |
| radius = int(min(width, height) * MASK_RADIUS_FRACTION) | |
| # Create coordinate grids | |
| y_coords, x_coords = np.ogrid[:height, :width] | |
| # Calculate distance from center for each pixel | |
| dx = x_coords - cx | |
| dy = y_coords - cy | |
| dist_sq = dx**2 + dy**2 | |
| radius_sq = radius**2 | |
| # Create mask (255 inside circle, 0 outside) | |
| mask = (dist_sq <= radius_sq).astype(np.uint8) * 255 | |
| return mask | |
| def upload_image_to_comfy(image_data, filename, server_address): | |
| """Upload an image to ComfyUI server.""" | |
| boundary = '----WebKitFormBoundary' + uuid.uuid4().hex[:16] | |
| body = b'' | |
| body += f'--{boundary}\r\n'.encode() | |
| body += f'Content-Disposition: form-data; name="image"; filename="{filename}"\r\n'.encode() | |
| body += b'Content-Type: image/png\r\n\r\n' | |
| body += image_data | |
| body += b'\r\n' | |
| body += f'--{boundary}\r\n'.encode() | |
| body += b'Content-Disposition: form-data; name="type"\r\n\r\n' | |
| body += b'input' | |
| body += b'\r\n' | |
| body += f'--{boundary}\r\n'.encode() | |
| body += b'Content-Disposition: form-data; name="overwrite"\r\n\r\n' | |
| body += b'true' | |
| body += b'\r\n' | |
| body += f'--{boundary}--\r\n'.encode() | |
| headers = { | |
| 'Content-Type': f'multipart/form-data; boundary={boundary}' | |
| } | |
| req = urllib.request.Request( | |
| f"http://{server_address}/upload/image", | |
| data=body, | |
| headers=headers, | |
| method='POST' | |
| ) | |
| response = urllib.request.urlopen(req) | |
| return json.loads(response.read()) | |
| def queue_prompt(prompt, client_id, server_address): | |
| """Queue a prompt to ComfyUI.""" | |
| p = {"prompt": prompt, "client_id": client_id} | |
| data = json.dumps(p).encode('utf-8') | |
| req = urllib.request.Request(f"http://{server_address}/prompt", data=data) | |
| return json.loads(urllib.request.urlopen(req).read()) | |
| def get_images_from_websocket(ws, prompt, client_id, server_address): | |
| """Get images from ComfyUI via websocket.""" | |
| prompt_id = queue_prompt(prompt, client_id, server_address)['prompt_id'] | |
| output_images = {} | |
| current_node = "" | |
| while True: | |
| out = ws.recv() | |
| if isinstance(out, str): | |
| message = json.loads(out) | |
| if message['type'] == 'executing': | |
| data = message['data'] | |
| if data['prompt_id'] == prompt_id: | |
| if data['node'] is None: | |
| break # Execution is done | |
| else: | |
| current_node = data['node'] | |
| else: | |
| # Binary data - image from SaveImageWebsocket | |
| if current_node: | |
| images_output = output_images.get(current_node, []) | |
| images_output.append(out[8:]) # Skip header | |
| output_images[current_node] = images_output | |
| return output_images | |
| def load_workflow_and_set_image(image_filename): | |
| """ | |
| Load the workflow from fill_sphere.json (API format) and set the input image. | |
| The workflow is already in API format, so we just need to replace the image filename. | |
| """ | |
| # Load the workflow template (already in API format) | |
| with open('fill_sphere.json', 'r', encoding='utf-8') as f: | |
| prompt = json.load(f) | |
| # Set the input image | |
| prompt["17"]["inputs"]["image"] = image_filename | |
| print(f"Set LoadImage node to use: {image_filename}") | |
| # Set seed to a random number | |
| #prompt["47:3"]["inputs"]["seed"] = random.randint(0, 1000000000000000) | |
| prompt["47:3"]["inputs"]["seed"] = 608313082563526 | |
| print(f"Set seed to {prompt['47:3']['inputs']['seed']}") | |
| return prompt | |
| def process_image(input_image_path, ws, client_id): | |
| """Process a single image through the ComfyUI workflow.""" | |
| # Load the input image | |
| print(f"Loading image: {input_image_path}") | |
| base_image = Image.open(input_image_path) | |
| width, height = base_image.size | |
| print(f"Original image size: {width}x{height}") | |
| # Resize so maximum edge is 1024 | |
| max_size = 1024 | |
| if max(width, height) > max_size: | |
| scale = max_size / max(width, height) | |
| new_width = int(width * scale) | |
| new_height = int(height * scale) | |
| base_image = base_image.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
| width, height = new_width, new_height | |
| print(f"Resized to: {width}x{height}") | |
| # Create circular mask | |
| print("Creating circular mask...") | |
| mask = create_circular_mask(width, height) | |
| # Create image with mask in alpha channel for ComfyUI | |
| # ComfyUI expects mask where alpha=0 means "inpaint this area", alpha=255 means "keep" | |
| print("Preparing image with mask for ComfyUI...") | |
| image_with_mask = base_image.convert('RGBA') | |
| img_array = np.array(image_with_mask) | |
| # Invert mask for ComfyUI (0=inpaint, 255=keep) | |
| img_array[:, :, 3] = 255 - mask | |
| image_with_mask_final = Image.fromarray(img_array) | |
| # Save to bytes | |
| img_byte_arr = io.BytesIO() | |
| image_with_mask_final.save(img_byte_arr, format='PNG') | |
| img_bytes = img_byte_arr.getvalue() | |
| # Upload to ComfyUI | |
| print(f"Uploading image to ComfyUI server at {COMFY_SERVER_ADDRESS}...") | |
| upload_result = upload_image_to_comfy(img_bytes, "masked_input.png", COMFY_SERVER_ADDRESS) | |
| masked_filename = upload_result.get('name', 'masked_input.png') | |
| print(f"Uploaded masked image as: {masked_filename}") | |
| # Load workflow from fill_sphere.json and set the input image | |
| print("Loading workflow from fill_sphere.json...") | |
| prompt = load_workflow_and_set_image(masked_filename) | |
| print("Queueing prompt and waiting for result...") | |
| images = get_images_from_websocket(ws, prompt, client_id, COMFY_SERVER_ADDRESS) | |
| # Save the output image | |
| print("Processing results...") | |
| output_saved = False | |
| for node_id in images: | |
| for image_data in images[node_id]: | |
| image = Image.open(io.BytesIO(image_data)) | |
| input_path = Path(input_image_path) | |
| output_filename = input_path.with_name(f"{input_path.stem}_with_sphere.jpg") | |
| image.save(output_filename, 'jpeg', quality=90) | |
| print(f"Saved output to {output_filename}") | |
| output_saved = True | |
| break | |
| if output_saved: | |
| break | |
| if not output_saved: | |
| print("Warning: No output images received from ComfyUI") | |
| return output_saved | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python add_sphere.py <input_image_path> [input_image_path2] [...]") | |
| sys.exit(1) | |
| image_paths = sys.argv[1:] | |
| total_images = len(image_paths) | |
| print(f"Processing {total_images} image(s)...") | |
| # Connect to ComfyUI websocket once for all images | |
| client_id = str(uuid.uuid4()) | |
| print(f"Connecting to ComfyUI websocket at {COMFY_SERVER_ADDRESS}...") | |
| ws = websocket.WebSocket() | |
| ws.connect(f"ws://{COMFY_SERVER_ADDRESS}/ws?clientId={client_id}") | |
| try: | |
| for i, input_image_path in enumerate(image_paths, 1): | |
| if Path(input_image_path).stem.endswith("_with_sphere"): | |
| print(f"Skipping image {i}/{total_images}: {input_image_path} (already processed)") | |
| continue | |
| print(f"\n{'='*60}") | |
| print(f"Processing image {i}/{total_images}: {input_image_path}") | |
| print(f"{'='*60}") | |
| process_image(input_image_path, ws, client_id) | |
| finally: | |
| ws.close() | |
| print(f"\nDone! Processed {total_images} image(s).") | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "17": { | |
| "inputs": { | |
| "image": "clipspace/clipspace-painted-masked-1769359946664.png [input]" | |
| }, | |
| "class_type": "LoadImage", | |
| "_meta": { | |
| "title": "Load Image" | |
| } | |
| }, | |
| "54": { | |
| "inputs": { | |
| "images": [ | |
| "47:8", | |
| 0 | |
| ] | |
| }, | |
| "class_type": "SaveImageWebsocket", | |
| "_meta": { | |
| "title": "SaveImageWebsocket" | |
| } | |
| }, | |
| "47:34": { | |
| "inputs": { | |
| "clip_name1": "clip_l.safetensors", | |
| "clip_name2": "t5xxl_fp16.safetensors", | |
| "type": "flux", | |
| "device": "default" | |
| }, | |
| "class_type": "DualCLIPLoader", | |
| "_meta": { | |
| "title": "DualCLIPLoader" | |
| } | |
| }, | |
| "47:26": { | |
| "inputs": { | |
| "guidance": 100, | |
| "conditioning": [ | |
| "47:23", | |
| 0 | |
| ] | |
| }, | |
| "class_type": "FluxGuidance", | |
| "_meta": { | |
| "title": "FluxGuidance" | |
| } | |
| }, | |
| "47:32": { | |
| "inputs": { | |
| "vae_name": "ae.safetensors" | |
| }, | |
| "class_type": "VAELoader", | |
| "_meta": { | |
| "title": "Load VAE" | |
| } | |
| }, | |
| "47:31": { | |
| "inputs": { | |
| "unet_name": "flux1-fill-dev.safetensors", | |
| "weight_dtype": "default" | |
| }, | |
| "class_type": "UNETLoader", | |
| "_meta": { | |
| "title": "Load Diffusion Model" | |
| } | |
| }, | |
| "47:46": { | |
| "inputs": { | |
| "conditioning": [ | |
| "47:23", | |
| 0 | |
| ] | |
| }, | |
| "class_type": "ConditioningZeroOut", | |
| "_meta": { | |
| "title": "ConditioningZeroOut" | |
| } | |
| }, | |
| "47:23": { | |
| "inputs": { | |
| "text": "a perfectly round light grey matte sphere, fully visible, with smooth, uniform Lambertian shading that accurately responds to the existing scene lighting. The sphere is directly in front of the camera and appears in front of everything.", | |
| "clip": [ | |
| "47:34", | |
| 0 | |
| ] | |
| }, | |
| "class_type": "CLIPTextEncode", | |
| "_meta": { | |
| "title": "CLIP Text Encode (Positive Prompt)" | |
| } | |
| }, | |
| "47:39": { | |
| "inputs": { | |
| "strength": 1, | |
| "model": [ | |
| "47:31", | |
| 0 | |
| ] | |
| }, | |
| "class_type": "DifferentialDiffusion", | |
| "_meta": { | |
| "title": "Differential Diffusion" | |
| } | |
| }, | |
| "47:8": { | |
| "inputs": { | |
| "samples": [ | |
| "47:3", | |
| 0 | |
| ], | |
| "vae": [ | |
| "47:32", | |
| 0 | |
| ] | |
| }, | |
| "class_type": "VAEDecode", | |
| "_meta": { | |
| "title": "VAE Decode" | |
| } | |
| }, | |
| "47:38": { | |
| "inputs": { | |
| "noise_mask": true, | |
| "positive": [ | |
| "47:26", | |
| 0 | |
| ], | |
| "negative": [ | |
| "47:46", | |
| 0 | |
| ], | |
| "vae": [ | |
| "47:32", | |
| 0 | |
| ], | |
| "pixels": [ | |
| "17", | |
| 0 | |
| ], | |
| "mask": [ | |
| "17", | |
| 1 | |
| ] | |
| }, | |
| "class_type": "InpaintModelConditioning", | |
| "_meta": { | |
| "title": "InpaintModelConditioning" | |
| } | |
| }, | |
| "47:3": { | |
| "inputs": { | |
| "seed": 656821733471329, | |
| "steps": 20, | |
| "cfg": 1, | |
| "sampler_name": "euler", | |
| "scheduler": "normal", | |
| "denoise": 1, | |
| "model": [ | |
| "47:39", | |
| 0 | |
| ], | |
| "positive": [ | |
| "47:38", | |
| 0 | |
| ], | |
| "negative": [ | |
| "47:38", | |
| 1 | |
| ], | |
| "latent_image": [ | |
| "47:38", | |
| 2 | |
| ] | |
| }, | |
| "class_type": "KSampler", | |
| "_meta": { | |
| "title": "KSampler" | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment