image extraction and MLLM management

This commit is contained in:
Nicola Leonardi 2025-11-15 19:11:09 +01:00
commit 4f189ec32e
6 changed files with 878 additions and 0 deletions

11
README.md Normal file
View File

@ -0,0 +1,11 @@
# WCGA AI validator
- Install the required dependencies (inside the docker folder)
```
pip install -r requirements.txt
```
- Start the application
LLM_accessibility_validator/wcag_validator.py

518
image_extractor.py Normal file
View File

@ -0,0 +1,518 @@
import asyncio
from playwright.async_api import async_playwright
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
from typing import List, Dict, Optional
import json
import argparse
from utils import disclaim_bool_string, prepare_output_folder, create_folder
import requests
import os
class ImageExtractor:
SUPPORTED_FORMATS = {"png", "jpeg", "jpg", "webp", "gif"}
def __init__(
self,
url: str,
context_levels: int = 5,
pixel_distance_threshold: int = 200,
number_of_images: int = 10,
save_images=True,
save_images_path="",
):
"""
Initialize the ImageExtractor.
Args:
url: The page URL to extract images from
context_levels: Number of parent/child levels to traverse for context (default=5)
pixel_distance_threshold: Maximum pixel distance for nearby text elements (default=200)
number_of_images: maximum number for the desired images
save_images: if save images
save_images_path: path to save images
"""
self.url = url
self.context_levels = context_levels
self.pixel_distance_threshold = pixel_distance_threshold
self.number_of_images = number_of_images
self.save_images = save_images
self.save_images_path = save_images_path
def _is_supported_format(self, img_url: str) -> bool:
"""Check if the image URL has a supported format."""
parsed = urlparse(img_url.lower())
path = parsed.path
# Check file extension
for fmt in self.SUPPORTED_FORMATS:
if path.endswith(f".{fmt}"):
return True
# Also check query parameters (e.g., format=jpeg)
return any(fmt in img_url.lower() for fmt in self.SUPPORTED_FORMATS)
async def _download_image(self, image_url, output_dir="images") -> None:
try:
# Get file extension from URL
ext = image_url.split(".")[-1].split("?")[0]
image_name = image_url.split("/")[-1][0 : -len(ext) - 1]
if ext not in ["jpg", "jpeg", "png", "gif", "webp"]:
ext = "jpg"
# Download the image
print("getting image:", image_url)
response = requests.get(image_url, timeout=10)
response.raise_for_status()
# Save the image
output_path = os.path.join(output_dir, f"{image_name}.{ext}")
with open(output_path, "wb") as f:
f.write(response.content)
print(f"Saved: {output_path}")
except Exception as e:
print(f"Error downloading {image_url}: {e}")
async def save_elaboration(self,images,output_dir)->None:
with open(output_dir, "w", encoding="utf-8") as f:
json.dump(images, f, indent=2, ensure_ascii=False)
print("\nResults saved to extracted_images.json")
async def _get_element_context(self, page, img_element) -> tuple[str, str, str]:
"""
Extract textual context around an image element from text-containing tags.
Returns:
Tuple of (full_context, immediate_context, nearby_text) where:
- full_context: Text extracted with self.context_levels
- immediate_context: Text extracted with context_level=1
- nearby_text: Text within pixel_distance_threshold pixels of the image
"""
try:
# JavaScript function to check if element is visible
"""
Visibility Checks :
visibility CSS property - Excludes elements with visibility: hidden or visibility: collapse
display CSS property - Excludes elements with display: none
opacity CSS property - Excludes elements with opacity: 0
Element dimensions - Excludes elements with zero width or height (collapsed elements)
"""
visibility_check = """
function isVisible(el) {
if (!el) return false;
const style = window.getComputedStyle(el);
// Check visibility and display properties
if (style.visibility === 'hidden' || style.visibility === 'collapse') return false;
if (style.display === 'none') return false;
if (style.opacity === '0') return false;
// Check if element has dimensions
const rect = el.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
return true;
}
"""
# JavaScript function to extract text at a specific context level
def get_context_js(levels):
return f"""
(element) => {{
{visibility_check}
// Text-containing tags to extract
/*const textTags = ['p', 'span', 'div', 'a', 'li', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
'label', 'figcaption', 'caption', 'blockquote', 'pre', 'code', 'em', 'strong',
'b', 'i', 'u', 'small', 'mark', 'sub', 'sup', 'time', 'article', 'section'];*/
const textTags = ['p', 'span', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'a'];
let textContent = [];
// Traverse up the DOM tree
let current = element;
for (let i = 0; i < {levels} && current.parentElement; i++) {{
current = current.parentElement;
}}
// Function to extract text from an element and its children
function extractText(el, depth = 0) {{
if (depth > {levels}) return;
// Skip if element is not visible
if (!isVisible(el)) return;
// Get direct text content of text-containing elements
if (textTags.includes(el.tagName.toLowerCase())) {{
const text = el.textContent.trim();
if (text && text.length > 0) {{
textContent.push({{
tag: el.tagName.toLowerCase(),
text: text
}});
}}
}}
// Recursively process children
for (let child of el.children) {{
extractText(child, depth + 1);
}}
}}
// Extract text from the context root
extractText(current);
// Format as readable text
//return textContent.map(item => `<${{item.tag}}>: ${{item.text}}`).join('\\n\\n');
return textContent.map(item => `<${{item.tag}}>: ${{item.text}}`).join(' ');
}}
"""
# JavaScript function to extract nearby text based on pixel distance
nearby_text_js = f"""
(element) => {{
{visibility_check}
/*const textTags = ['p', 'span', 'div', 'a', 'li', 'td', 'th', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6',
'label', 'figcaption', 'caption', 'blockquote', 'pre', 'code', 'em', 'strong',
'b', 'i', 'u', 'small', 'mark', 'sub', 'sup', 'time'];*/
const textTags = ['p', 'span', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'a'];
const threshold = {self.pixel_distance_threshold};
const imgRect = element.getBoundingClientRect();
const imgCenterX = imgRect.left + imgRect.width / 2;
const imgCenterY = imgRect.top + imgRect.height / 2;
// Calculate distance between two rectangles.
function getDistance(rect1, rect2) {{
// Get centers
const x1 = rect1.left + rect1.width / 2;
const y1 = rect1.top + rect1.height / 2;
const x2 = rect2.left + rect2.width / 2;
const y2 = rect2.top + rect2.height / 2;
// Euclidean distance
return Math.sqrt(Math.pow(x2 - x1, 2) + Math.pow(y2 - y1, 2)); //This can be changed considering not only the distance between the centers but maybe the nearest points
}}
let nearbyElements = [];
// Find all text elements on the page
const allElements = document.querySelectorAll(textTags.join(','));
allElements.forEach(el => {{
// Skip if element is not visible
if (!isVisible(el)) return;
const text = el.textContent.trim();
if (!text || text.length === 0) return;
// Skip if it's the image itself or contains the image
if (el === element || el.contains(element)) return;
const elRect = el.getBoundingClientRect();
const distance = getDistance(imgRect, elRect);
if (distance <= threshold) {{
nearbyElements.push({{
tag: el.tagName.toLowerCase(),
text: text,
distance: Math.round(distance)
}});
}}
}});
// Sort by distance
nearbyElements.sort((a, b) => a.distance - b.distance);
// Format output
//return nearbyElements.map(item =>
// `<${{item.tag}}> [${{item.distance}}px]: ${{item.text}}`
//).join('\\n\\n');
return nearbyElements.map(item =>
`<${{item.tag}}> [${{item.distance}}px]: ${{item.text}}`
).join(' ');
}}
"""
# Get full context with self.context_levels
full_context_js = get_context_js(self.context_levels)
full_context = await img_element.evaluate(full_context_js)
full_context = full_context if full_context else "No textual context found"
# Get immediate context with level=1
immediate_context_js = get_context_js(1)
immediate_context = await img_element.evaluate(immediate_context_js)
immediate_context = (
immediate_context if immediate_context else "No immediate context found"
)
# Get nearby text based on pixel distance
nearby_text = await img_element.evaluate(nearby_text_js)
nearby_text = nearby_text if nearby_text else "No nearby text found"
return full_context, immediate_context, nearby_text
except Exception as e:
error_msg = f"Error extracting context: {str(e)}"
return error_msg, error_msg, error_msg
async def _get_page_metadata(self, page) -> Dict[str, Optional[str]]:
"""Extract page metadata including title, description, and keywords."""
metadata = {
"title": await page.title(),
"description": None,
"keywords": None,
"headings": [],
}
# Extract meta description
try:
description = await page.locator('meta[name="description"]').get_attribute(
"content"
)
metadata["description"] = description
except:
pass
# Extract meta keywords
try:
keywords = await page.locator('meta[name="keywords"]').get_attribute(
"content"
)
metadata["keywords"] = keywords
except:
pass
# Extract all headings (h1-h6)
for level in range(1, 7):
headings = await page.locator(f"h{level}").all_text_contents()
for heading in headings:
if heading.strip():
metadata["headings"].append(
{"level": level, "text": heading.strip()}
)
return metadata
async def extract_images(self) -> List[Dict]:
"""
Extract all images from the page with their metadata and context.
Returns:
List of dictionaries containing image information
"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
try:
# await page.goto(self.url, wait_until='networkidle')#original
# ---alternative
await page.goto(self.url, wait_until="load")
# Wait for page to load completely
await page.wait_for_timeout(2000) # Wait for dynamic content
# -----
# Get page metadata once
page_metadata = await self._get_page_metadata(page)
# Find all img elements
img_elements = await page.locator("img").all()
image_source_list = [] # avoid multiple check for the same image url
images_data = []
for img in img_elements[
0 : self.number_of_images
]: # limits the image list based on the ini param
try:
# Get image src
src = await img.get_attribute("src")
if not src:
continue
if src not in image_source_list:
image_source_list.append(src)
else:
print("image src", src, "already processed. Skipped.")
continue
# Convert relative URLs to absolute
img_url = urljoin(self.url, src)
# Verify format
if not self._is_supported_format(img_url):
continue
if disclaim_bool_string(self.save_images) == True:
print("save image:", img_url.split("/")[-1])
await self._download_image(
image_url=img_url, output_dir=self.save_images_path
)
# Get alt text
alt_text = await img.get_attribute("alt") or ""
# Get surrounding HTML context (full, immediate, and nearby)
html_context, immediate_context, nearby_text = (
await self._get_element_context(page, img)
)
# Compile image data
image_info = {
"url": img_url,
"alt_text": alt_text,
"html_context": html_context,
"immediate_context": immediate_context,
"nearby_text": nearby_text,
"page_url": self.url,
"page_title": page_metadata["title"],
"page_description": page_metadata["description"],
"page_keywords": page_metadata["keywords"],
"page_headings": page_metadata["headings"],
}
images_data.append(image_info)
except Exception as e:
print(f"Error processing image: {str(e)}")
continue
return images_data
finally:
await browser.close()
async def main(args):
url = args.page_url
context_levels = args.context_levels
pixel_distance_threshold = args.pixel_distance_threshold
number_of_images = args.number_of_images
save_images = args.save_images
print(
"call ImageExtrcator with-",
"page_url:",
url,
"context_levels:",
context_levels,
"pixel_distance_threshold:",
pixel_distance_threshold,
"number_of_images:",
number_of_images,
"save_images:",
save_images,
)
if (
disclaim_bool_string(args.save_elaboration) == True
or disclaim_bool_string(args.save_images) == True
): # if something to save
url_path = url.replace(":", "").replace("//", "_").replace("/", "_")
now = datetime.now(timezone.utc)
now_str = now.strftime("%Y_%m_%d-%H_%M_%S")
output_dir = prepare_output_folder(url_path, now_str)
if disclaim_bool_string(args.save_images) == True:
images_output_dir = create_folder(
output_dir, directory_separator="/", next_path="images"
)
print("save images path:", images_output_dir)
# Create extractor
extractor = ImageExtractor(
url,
context_levels=context_levels,
pixel_distance_threshold=pixel_distance_threshold,
number_of_images=number_of_images,
save_images=save_images,
save_images_path=images_output_dir,
)
# Extract images
print(f"Extracting images from: {url}")
images = await extractor.extract_images()
print(f"\nFound {len(images)} supported images\n")
# Display results
for i, img in enumerate(images, 1):
print(f"Image {i}:")
print(f" URL: {img['url']}")
print(f" Alt text: {img['alt_text']}")
print(f" Page title: {img['page_title']}")
print(f" Full context length: {len(img['html_context'])} characters")
print(f" Immediate context length: {len(img['immediate_context'])} characters")
print(f" Nearby text length: {len(img['nearby_text'])} characters")
print(f" Number of headings on page: {len(img['page_headings'])}")
print("-" * 80)
if disclaim_bool_string(args.save_elaboration) == True: # Optionally save to JSON
await extractor.save_elaboration(images,output_dir=output_dir + "/extracted_images.json")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--page_url",
type=str,
help=("Url page to analyze"),
default="https://www.bbc.com",
)
parser.add_argument(
"--context_levels",
type=int,
default=5,
help=("HTML context levels around the image"),
)
parser.add_argument(
"--pixel_distance_threshold",
type=int,
default=200,
help=("pixel distance threshold around the image"),
)
parser.add_argument(
"--number_of_images",
type=int,
default=10,
help=("max number of desired images"),
)
parser.add_argument(
"--save_elaboration",
action="store_true",
default=True,
help=("If True save the elaborated info in a json file"),
)
parser.add_argument(
"--save_images",
action="store_true",
default=True,
help=("If True save the images"),
)
args = parser.parse_args()
asyncio.run(main(args))

70
mllm_management.py Normal file
View File

@ -0,0 +1,70 @@
from utils import call_API_urlibrequest
class MLLMManager:
def __init__(self, end_point, api_key):
self.end_point = end_point
self.api_key = api_key
def get_response(self, system_prompt, user_prompt):
payload = self.create_mllm_payload(system_prompt, user_prompt)
headers = [
["Content-Type", "application/json"],
["Authorization", f"Bearer {self.api_key}"]
]
response = call_API_urlibrequest(url=self.end_point, headers=headers, data=payload)
return response
def create_mllm_payload(self, system_prompt, user_prompt):
payload = {
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.7,
"top_p": 0.95,
"frequency_penalty": 0,
"presence_penalty": 0,
"max_tokens": 800,
"stop": None,
}
return payload
def get_alt_text_system_prompt(self):
system_prompt = """You are a web accessibility evaluation tool. Your task is to evaluate if alterative text for
images on webpages are appropriate according to WCAG guidelines. The alt-text should serve the same purpose and present
the same information as the image, and should be able to substitute for the non-text content. The text alternative would
be brief but as informative as possible.
Follow these instructions carefully:
1. You will be provided as input with the following:
- The image found on the webpage.
- The associated alternative text. When the alt-text is empty or absent, you will be explicitly informed.
- The surrounding context of the image.
- The page title, headings and the content of the keywords and description <meta> tag, if found.
2. Determine the function and purpose of the image by analyzing these elements. Take into account the purpose and function
of the associated image by considering the page context. Check also if the image is, or is associated with, a link or a button,
and consider this in your judgement. If the image contains text use that as part of the context.
3. Provide a final assessment based on the following:
- 'success' if you can assess with 'sufficient certainty' the alt-text is appropriate in relation to the image purpose,
- 'failure' if you can assess with 'sufficient certainty' that the alt-text is NOT appropriate,
- 'warning' if you cannot determine with 'sufficient certainty'.
where the level of certainty goes from 1 to 100 and 'sufficient certainty' means > 80
4. The original alt-text assessment on a scale from 1 to 5, where 5 is the best score. Use an integer number only.
5. Provide a brief reasoning for your judgment. If the image contains text, write it verbatim. Your response should be in English.
6. Keep your response within 150 words.
7. Generate the new most appropriate alt-text given the context and the steps before. Keep this within 30 words.
8. Here is the JSON format the results must have:
{"Original alt-text assessment" : "*your original alt-text assessment*", "Assessment" : "*your assessment*", "EvaluationResult": "*your response*", "New alt-text":"*new alt-text*"}"""
return system_prompt
def get_alt_text_user_prompt(self, altTextMessage, imageURL, HTMLcontext, pageText):
user_prompt= [{ "type": "text", "text": altTextMessage }, { "type": "image_url", "image_url": { "url": imageURL }}, { "type": "text", "text": HTMLcontext }, { "type": "text", "text": pageText }]
return user_prompt

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
pandas==2.3.3
playwright==1.56.0
python-dotenv==1.2.1
requests==2.32.5

124
utils.py Normal file
View File

@ -0,0 +1,124 @@
import json
import time
import urllib.request
import urllib.parse
import logging
import os
exception_msg = "Exception: %s"
def call_API_urlibrequest(
data={},
verbose=False,
url="",
headers=[],
method="post",
base=2, # number of seconds to wait
max_tries=3,
):
if verbose:
logging.info("input_data:%s", data)
# Allow multiple attempts to call the API incase of downtime.
# Return provided response to user after 3 failed attempts.
wait_seconds = [base**i for i in range(max_tries)]
for num_tries in range(max_tries):
try:
if method == "get":
# Encode the parameters and append them to the URL
query_string = urllib.parse.urlencode(data)
url_with_params = f"{url}?{query_string}"
request = urllib.request.Request(url_with_params, method="GET")
for ele in headers:
request.add_header(ele[0], ele[1])
elif method == "post":
# Convert the dictionary to a JSON formatted string and encode it to bytes
data_to_send = json.dumps(data).encode("utf-8")
request = urllib.request.Request(url, data=data_to_send, method="POST")
for ele in headers:
request.add_header(ele[0], ele[1])
else:
return {"error_message": "method_not_allowed"}
# Send the request and capture the response
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_json = json.loads(response.read().decode("utf-8"))
logging.info("response_json:%s", response_json)
logging.info("response.status_code:%s", response.getcode())
return response_json
except Exception as e:
logging.error("error message:%s", e)
response_json = {"error": e}
logging.info("num_tries:%s", num_tries)
logging.info(
"Waiting %s seconds before automatically trying again.",
str(wait_seconds[num_tries]),
)
time.sleep(wait_seconds[num_tries])
logging.info(
"Tried %s times to make API call to get a valid response object", max_tries
)
logging.info("Returning provided response")
return response_json
def disclaim_bool_string(value):
if isinstance(value, str):
if value == "True":
return True
else:
return False
elif isinstance(value, bool):
return value
def prepare_output_folder(file, now_str):
output_dir = ""
try:
output_dir = create_folder(
root_path=os.getcwd(),
directory_separator="/",
next_path="outputs",
)
output_dir = create_folder(
root_path=output_dir,
directory_separator="/",
next_path=file + "_" + now_str,
)
except Exception as e:
logging.error("error prepare output folder:%s", e)
return output_dir
def create_folder(root_path, directory_separator, next_path):
output_dir = root_path + directory_separator + next_path
try:
if not os.path.exists(output_dir):
os.mkdir(output_dir)
except Exception as e:
logging.error(exception_msg, e)
exit(1)
return output_dir

151
wcag_validator.py Normal file
View File

@ -0,0 +1,151 @@
import sys
import argparse
import json
import asyncio
from utils import disclaim_bool_string, prepare_output_folder, create_folder
from datetime import datetime, timezone
from dotenv import load_dotenv, find_dotenv
import os
import warnings
warnings.filterwarnings("ignore")
exception_msg = "Exception: %s"
from image_extractor import ImageExtractor
from mllm_management import MLLMManager
async def cli(sys_argv):
parser = argparse.ArgumentParser()
parser.add_argument(
"--page_url",
type=str,
help=("Url page to analyze"),
default="https://www.bbc.com",
)
parser.add_argument(
"--context_levels",
type=int,
default=5,
help=("HTML context levels around the image"),
)
parser.add_argument(
"--pixel_distance_threshold",
type=int,
default=200,
help=("pixel distance threshold around the image"),
)
parser.add_argument(
"--number_of_images",
type=int,
default=10,
help=("max number of desired images"),
)
parser.add_argument(
"--save_elaboration",
action="store_true",
default=True,
help=("If True save the elaborated info in a json file"),
)
parser.add_argument(
"--save_images",
action="store_true",
default=True,
help=("If True save the images"),
)
args = parser.parse_args()
print("wcag validator args:",args)
if (
disclaim_bool_string(args.save_elaboration) == True
or disclaim_bool_string(args.save_images) == True
): # if something to save
url_path = args.page_url.replace(":", "").replace("//", "_").replace("/", "_")
now = datetime.now(timezone.utc)
now_str = now.strftime("%Y_%m_%d-%H_%M_%S")
output_dir = prepare_output_folder(url_path, now_str)
if disclaim_bool_string(args.save_images) == True:
images_output_dir = create_folder(
output_dir, directory_separator="/", next_path="images"
)
print("save images path:", images_output_dir)
### Task #1: ---------- Image Extractor
# Create extractor
image_extractor = ImageExtractor(
args.page_url,
context_levels=args.context_levels,
pixel_distance_threshold=args.pixel_distance_threshold,
number_of_images=args.number_of_images,
save_images=args.save_images,
save_images_path=images_output_dir,
)
# Extract images
print(f"Extracting images from: {args.page_url}")
images = await image_extractor.extract_images()
if disclaim_bool_string(args.save_elaboration) == True: # Optionally save to JSON
await image_extractor.save_elaboration(images,output_dir=output_dir + "/extracted_images.json")
#---------------------------------------------
### Task #2: ---------- MLLM management
env_path = find_dotenv(filename=".env")
_ = load_dotenv(env_path) # read .env file
mllm_end_point=os.getenv("mllm_end_point")
mllm_api_key=os.getenv("mllm_api_key")
print("mllm_end_point:",mllm_end_point)
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key)
print("mllm_manager.end_point:", mllm_manager.end_point)
alt_text_system_prompt = mllm_manager.get_alt_text_system_prompt()
print("alt_text_system_prompt:", alt_text_system_prompt)
mllm_responses = []
for img_info in images:
alt_text="Here is the alt-text of the image: " + img_info["alt_text"]
image_URL=img_info["url"]
HTML_context = "Here is the surrounding HTML context of the element: " + img_info["html_context"]
page_text = "Here is the content of the page: Title of the page: " + str(img_info["page_title"])
page_text=page_text+", content of the <meta name='description'> tag: " + str(img_info["page_description"])
page_text=page_text+", content of the <meta name='keywords'> tag: " + str(img_info["page_keywords"])
# skip headings
print("Processing image URL:", image_URL)
print("Alt-text:", alt_text)
print("HTML context:", HTML_context)
print("Page text:", page_text)
alt_text_user_prompt=mllm_manager.get_alt_text_user_prompt(
altTextMessage=alt_text,
imageURL=image_URL,
HTMLcontext=HTML_context,
pageText=page_text,
)
print("alt_text_user_prompt:", alt_text_user_prompt)
mllm_response = mllm_manager.get_response(
system_prompt=alt_text_system_prompt,
user_prompt=alt_text_user_prompt
)
mllm_responses.append(mllm_response['choices'][0]["message"]["content"])
if disclaim_bool_string(args.save_elaboration) == True: # Optionally save to JSON
with open(output_dir + "/mllm_responses.json", "w", encoding="utf-8") as f:
json.dump(mllm_responses, f, indent=2, ensure_ascii=False)
if __name__ == '__main__':
asyncio.run(cli(sys.argv[1:]))