UI release
This commit is contained in:
parent
e53ac19298
commit
02d11a4c6e
|
|
@ -25,4 +25,7 @@ python wcag_validator.py
|
|||
## For the RESTservice use:
|
||||
python wcag_validator_RESTserver.py
|
||||
|
||||
## For UI use:
|
||||
python ui_alt_text.py
|
||||
|
||||
## The scripts folder contains some elaboration scripts. They require a dedicated requirements file
|
||||
|
|
@ -0,0 +1 @@
|
|||
gradio==5.49.1
|
||||
|
|
@ -0,0 +1,576 @@
|
|||
#### To launch the script
|
||||
# gradio ui_alt_text.py
|
||||
# python ui_alt_text.py
|
||||
|
||||
import gradio as gr
|
||||
import requests
|
||||
|
||||
# from ..dependences.utils import call_API_urlibrequest
|
||||
import logging
|
||||
import time
|
||||
import json
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
|
||||
WCAG_VALIDATOR_RESTSERVER_HEADERS = [("Content-Type", "application/json")]
|
||||
|
||||
url_list = [
|
||||
"https://amazon.com",
|
||||
"https://web.archive.org/web/20251126051721/https://www.amazon.com/",
|
||||
"https://web.archive.org/web/20230630235957/http://www.amazon.com/",
|
||||
"https://ebay.com",
|
||||
"https://walmart.com",
|
||||
"https://etsy.com",
|
||||
"https://target.com",
|
||||
"https://wayfair.com",
|
||||
"https://bestbuy.com",
|
||||
"https://macys.com",
|
||||
"https://homedepot.com",
|
||||
"https://costco.com",
|
||||
"https://www.ansa.it",
|
||||
"https://en.wikipedia.org/wiki/Main_Page",
|
||||
"https://www.lanazione.it",
|
||||
"https://www.ansa.it",
|
||||
"https://www.bbc.com",
|
||||
"https://www.cnn.com",
|
||||
"https://www.nytimes.com",
|
||||
"https://www.theguardian.com",
|
||||
]
|
||||
|
||||
# ------ TODO use from utils instead of redefining here
|
||||
|
||||
|
||||
def call_API_urlibrequest(
|
||||
data={},
|
||||
verbose=False,
|
||||
url="",
|
||||
headers=[],
|
||||
method="post",
|
||||
base=2, # number of seconds to wait
|
||||
max_tries=3,
|
||||
):
|
||||
|
||||
if verbose:
|
||||
logging.info("input_data:%s", data)
|
||||
|
||||
# Allow multiple attempts to call the API incase of downtime.
|
||||
# Return provided response to user after 3 failed attempts.
|
||||
wait_seconds = [base**i for i in range(max_tries)]
|
||||
|
||||
for num_tries in range(max_tries):
|
||||
try:
|
||||
|
||||
if method == "get":
|
||||
|
||||
# Encode the parameters and append them to the URL
|
||||
query_string = urllib.parse.urlencode(data)
|
||||
|
||||
url_with_params = f"{url}?{query_string}"
|
||||
request = urllib.request.Request(url_with_params, method="GET")
|
||||
for ele in headers:
|
||||
|
||||
request.add_header(ele[0], ele[1])
|
||||
|
||||
elif method == "post":
|
||||
# Convert the dictionary to a JSON formatted string and encode it to bytes
|
||||
data_to_send = json.dumps(data).encode("utf-8")
|
||||
|
||||
request = urllib.request.Request(url, data=data_to_send, method="POST")
|
||||
for ele in headers:
|
||||
|
||||
request.add_header(ele[0], ele[1])
|
||||
else:
|
||||
return {"error_message": "method_not_allowed"}
|
||||
|
||||
# Send the request and capture the response
|
||||
|
||||
with urllib.request.urlopen(request) as response:
|
||||
# Read and decode the response
|
||||
|
||||
response_json = json.loads(response.read().decode("utf-8"))
|
||||
logging.info("response_json:%s", response_json)
|
||||
|
||||
logging.info("response.status_code:%s", response.getcode())
|
||||
return response_json
|
||||
|
||||
except Exception as e:
|
||||
|
||||
logging.error("error message:%s", e)
|
||||
response_json = {"error": e}
|
||||
|
||||
logging.info("num_tries:%s", num_tries)
|
||||
logging.info(
|
||||
"Waiting %s seconds before automatically trying again.",
|
||||
str(wait_seconds[num_tries]),
|
||||
)
|
||||
time.sleep(wait_seconds[num_tries])
|
||||
|
||||
logging.info(
|
||||
"Tried %s times to make API call to get a valid response object", max_tries
|
||||
)
|
||||
logging.info("Returning provided response")
|
||||
return response_json
|
||||
|
||||
|
||||
def create_folder(root_path, directory_separator, next_path):
|
||||
output_dir = root_path + directory_separator + next_path
|
||||
try:
|
||||
if not os.path.exists(output_dir):
|
||||
os.mkdir(output_dir)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(exception_msg, e)
|
||||
|
||||
exit(1)
|
||||
return output_dir
|
||||
|
||||
|
||||
def db_persistence_startup(
|
||||
db_name_and_path="persistence/wcag_validator.db",
|
||||
table="wcag_validator_results",
|
||||
):
|
||||
|
||||
try:
|
||||
|
||||
_ = create_folder(
|
||||
root_path=os.getcwd(),
|
||||
directory_separator="/",
|
||||
next_path="persistence",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error("exception on db persistence startup:%s", e)
|
||||
|
||||
exit(1)
|
||||
try:
|
||||
db_connection = sqlite3.connect(db_name_and_path)
|
||||
cursor = db_connection.cursor()
|
||||
# Create a table to store JSON data
|
||||
cursor.execute(
|
||||
"""CREATE TABLE IF NOT EXISTS """
|
||||
+ table
|
||||
+ """ (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
insert_type TEXT,
|
||||
json_input_data TEXT, json_output_data TEXT
|
||||
)"""
|
||||
)
|
||||
|
||||
db_connection.commit()
|
||||
logging.info("connection to the database established")
|
||||
return db_connection
|
||||
|
||||
except Exception as e:
|
||||
|
||||
logging.error("db_management problem:%s", e)
|
||||
exit(1)
|
||||
|
||||
|
||||
def db_persistence_insert(
|
||||
connection_db,
|
||||
insert_type,
|
||||
json_in_str,
|
||||
json_out_str,
|
||||
table="wcag_validator_results",
|
||||
):
|
||||
|
||||
try:
|
||||
cursor = connection_db.cursor()
|
||||
|
||||
# Insert JSON data into the table along with the current timestamp
|
||||
cursor.execute(
|
||||
"INSERT INTO "
|
||||
+ table
|
||||
+ " (insert_type,json_input_data,json_output_data) VALUES (?,?,?)",
|
||||
(insert_type, json_in_str, json_out_str),
|
||||
)
|
||||
connection_db.commit()
|
||||
logging.info(
|
||||
"Data correctly saved on local db table:%s, insertion type:%s",
|
||||
table,
|
||||
insert_type,
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error("exception" + " %s", e)
|
||||
|
||||
|
||||
# ------- End TODO use from utils instead of redefining here
|
||||
|
||||
|
||||
# Method 1: Embed external website (works only for sites that allow iframes)
|
||||
def create_iframe(url):
|
||||
iframe_html = (
|
||||
f'<iframe src="{url}" width="100%" height="600px" frameborder="0"></iframe>'
|
||||
)
|
||||
return iframe_html
|
||||
|
||||
|
||||
def load_images_from_json(json_input):
|
||||
"""Extract URLs and alt text from JSON and create HTML gallery"""
|
||||
try:
|
||||
data = json_input
|
||||
|
||||
if "images" not in data or not data["images"]:
|
||||
return "No images found in JSON", ""
|
||||
|
||||
images = data["images"]
|
||||
info_text = f"Found {len(images)} image(s)\n"
|
||||
print(f"Found {len(data['images'])} image(s)")
|
||||
|
||||
# Create HTML gallery with checkboxes and assessment forms
|
||||
html = """
|
||||
<style>
|
||||
.image-gallery {
|
||||
display: grid;
|
||||
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
|
||||
gap: 20px;
|
||||
padding: 20px;
|
||||
}
|
||||
.image-card {
|
||||
border: 2px solid #e0e0e0;
|
||||
border-radius: 8px;
|
||||
padding: 10px;
|
||||
background: white;
|
||||
}
|
||||
.image-card:has(input:checked) {
|
||||
border-color: #2196F3;
|
||||
background: #a7c1c1;
|
||||
}
|
||||
.image-card img {
|
||||
width: 100%;
|
||||
height: 200px;
|
||||
object-fit: cover;
|
||||
border-radius: 4px;
|
||||
}
|
||||
.image-info {
|
||||
margin-top: 10px;
|
||||
}
|
||||
.checkbox-label {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 8px;
|
||||
cursor: pointer;
|
||||
font-weight: 500;
|
||||
}
|
||||
.image-checkbox {
|
||||
width: 18px;
|
||||
height: 18px;
|
||||
cursor: pointer;
|
||||
accent-color: #2196F3;
|
||||
}
|
||||
.alt-text {
|
||||
font-size: 14px;
|
||||
color: #666;
|
||||
margin-top: 5px;
|
||||
}
|
||||
.assessment-panel {
|
||||
display: none;
|
||||
margin-top: 15px;
|
||||
padding: 10px;
|
||||
background: #f0f7ff;
|
||||
border-radius: 4px;
|
||||
border: 1px solid #2196F3;
|
||||
}
|
||||
.assessment-panel.visible {
|
||||
display: block;
|
||||
}
|
||||
.form-group {
|
||||
margin: 10px 0;
|
||||
}
|
||||
.form-group label {
|
||||
display: block;
|
||||
font-weight: 500;
|
||||
margin-bottom: 5px;
|
||||
font-size: 13px;
|
||||
}
|
||||
.range-container {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 10px;
|
||||
}
|
||||
.range-container input[type="range"] {
|
||||
flex: 1;
|
||||
}
|
||||
.range-value {
|
||||
font-weight: bold;
|
||||
min-width: 20px;
|
||||
text-align: center;
|
||||
}
|
||||
textarea {
|
||||
width: 100%;
|
||||
padding: 8px;
|
||||
border: 1px solid #ccc;
|
||||
border-radius: 4px;
|
||||
font-size: 13px;
|
||||
font-family: inherit;
|
||||
resize: vertical;
|
||||
}
|
||||
</style>
|
||||
<div class="image-gallery">
|
||||
"""
|
||||
|
||||
for idx, img_data in enumerate(images):
|
||||
url = img_data.get("url", "")
|
||||
alt_text = img_data.get("alt_text", "No description")
|
||||
|
||||
html += f"""
|
||||
<div class="image-card">
|
||||
<img src="{url}" alt="{alt_text}" loading="lazy" onerror="this.src='data:image/svg+xml,%3Csvg xmlns=%22http://www.w3.org/2000/svg%22 width=%22200%22 height=%22200%22%3E%3Crect fill=%22%23ddd%22 width=%22200%22 height=%22200%22/%3E%3Ctext x=%2250%25%22 y=%2250%25%22 text-anchor=%22middle%22 dy=%22.3em%22 fill=%22%23999%22%3EImage not found%3C/text%3E%3C/svg%3E'">
|
||||
<div class="image-info">
|
||||
<label class="checkbox-label">
|
||||
<input type="checkbox" class="image-checkbox" data-imgurl="{url}" data-index="{idx}"
|
||||
onchange="
|
||||
const panel = document.getElementById('panel-{idx}');
|
||||
const checkedCount = document.querySelectorAll('.image-checkbox:checked').length;
|
||||
if (this.checked) {{
|
||||
if (checkedCount > 3) {{
|
||||
this.checked = false;
|
||||
alert('Maximum 3 images can be selected!');
|
||||
return;
|
||||
}}
|
||||
panel.classList.add('visible');
|
||||
}} else {{
|
||||
panel.classList.remove('visible');
|
||||
}}
|
||||
">
|
||||
Select #{idx + 1}
|
||||
</label>
|
||||
<div class="alt-text">Current alt_text: {alt_text}</div>
|
||||
|
||||
<div id="panel-{idx}" class="assessment-panel">
|
||||
<div class="form-group">
|
||||
<label>Rate current alt-text:</label>
|
||||
<div class="range-container">
|
||||
<input type="range" min="1" max="5" value="3"
|
||||
class="assessment-range" data-index="{idx}"
|
||||
oninput="document.getElementById('range-value-{idx}').textContent = this.value">
|
||||
<span id="range-value-{idx}" class="range-value">3</span>
|
||||
</div>
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label>New alt-text:</label>
|
||||
<textarea class="new-alt-text" data-index="{idx}" rows="3" placeholder="Enter improved alt-text...">{alt_text}</textarea>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<input type="hidden" class="original-alt" data-index="{idx}" value="{alt_text}" />
|
||||
</div>
|
||||
</div>
|
||||
"""
|
||||
info_text += f"✓ Image {idx+1} alt_text: {alt_text}\n"
|
||||
html += "</div>"
|
||||
|
||||
return info_text, html
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
return f"Error: Invalid JSON format - {str(e)}", ""
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}", ""
|
||||
|
||||
|
||||
def load_llm_assessment_from_json(json_input):
|
||||
|
||||
try:
|
||||
# Parse JSON input
|
||||
data = json_input
|
||||
|
||||
if "mllm_validations" not in data or not data["mllm_validations"]:
|
||||
print("no mllm_validations found")
|
||||
return "No mllm_validations found in JSON", []
|
||||
|
||||
info_text = f"Assessment done on {len(data['mllm_validations']['mllm_alttext_assessments'])} image(s)\n\n"
|
||||
print(
|
||||
f"Assessment done on {len(data['mllm_validations']['mllm_alttext_assessments'])} image(s)"
|
||||
)
|
||||
|
||||
for idx, img_data in enumerate(
|
||||
data["mllm_validations"]["mllm_alttext_assessments"], 1
|
||||
):
|
||||
|
||||
original_alt_text_assessment = img_data["mllm_response"].get(
|
||||
"original_alt_text_assessment", "No description"
|
||||
)
|
||||
new_alt_text = img_data["mllm_response"].get(
|
||||
"new_alt_text", "No description"
|
||||
)
|
||||
alt_text_original = img_data.get("alt_text", "No alt_text provided")
|
||||
|
||||
info_text += f"✓ alt_text original: {alt_text_original}. LLM assessment: {original_alt_text_assessment} => LLM proposed alt_text: {new_alt_text}\n"
|
||||
|
||||
return info_text
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
return f"Error: Invalid JSON format - {str(e)}", []
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}", []
|
||||
|
||||
|
||||
def make_alttext_llm_assessment_api_call(
|
||||
url, selected_images_json=[], number_of_images=30
|
||||
):
|
||||
print(f"Making API call to {url}")
|
||||
selected_images = json.loads(selected_images_json) if selected_images_json else []
|
||||
print("selected_images:", selected_images)
|
||||
|
||||
if not selected_images or len(selected_images) == 0:
|
||||
info_text = "No images selected"
|
||||
return info_text
|
||||
|
||||
selected_urls = []
|
||||
for img in selected_images:
|
||||
selected_urls.append(img["image_url"])
|
||||
try:
|
||||
|
||||
response = call_API_urlibrequest(
|
||||
data={
|
||||
"page_url": url,
|
||||
"number_of_images": number_of_images,
|
||||
"context_levels": 5,
|
||||
"pixel_distance_threshold": 200,
|
||||
"save_images": "True",
|
||||
"save_elaboration": "True",
|
||||
"specific_images_urls": selected_urls,
|
||||
},
|
||||
url="http://localhost:8000/wcag_alttext_validation",
|
||||
headers=WCAG_VALIDATOR_RESTSERVER_HEADERS,
|
||||
)
|
||||
# return response
|
||||
info_text = load_llm_assessment_from_json(response)
|
||||
|
||||
return info_text
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def make_image_extraction_api_call(url, number_of_images=30):
|
||||
print(f"Making API call to {url}")
|
||||
try:
|
||||
|
||||
response = call_API_urlibrequest(
|
||||
data={
|
||||
"page_url": url,
|
||||
"number_of_images": number_of_images,
|
||||
},
|
||||
url="http://localhost:8000/extract_images",
|
||||
headers=WCAG_VALIDATOR_RESTSERVER_HEADERS,
|
||||
)
|
||||
# return response
|
||||
info_text, gallery_images = load_images_from_json(response)
|
||||
|
||||
return info_text, gallery_images
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
# ------- Gradio Interface -------#
|
||||
|
||||
# Global variable to hold database connection
|
||||
connection_db = db_persistence_startup(table="wcag_user_assessments")
|
||||
|
||||
# Create Gradio interface
|
||||
with gr.Blocks(theme="Insuz/SimpleIndigo", title="WCAG AI Validator") as demo:
|
||||
|
||||
# Use the global connection_db reference
|
||||
print("Database connection reference available globally")
|
||||
|
||||
gr.Markdown("# WCAG AI Validator UI")
|
||||
|
||||
with gr.Tab("Alt Text Assessment"):
|
||||
with gr.Row():
|
||||
with gr.Column():
|
||||
|
||||
with gr.Row():
|
||||
with gr.Column():
|
||||
url_input = gr.Dropdown(
|
||||
url_list,
|
||||
value=url_list[0],
|
||||
multiselect=False,
|
||||
label="Select an URL",
|
||||
info="Select an URL to load in iframe",
|
||||
)
|
||||
with gr.Column():
|
||||
|
||||
image_extraction_api_call_btn = gr.Button(
|
||||
"Extract Images & Alt Text", variant="primary"
|
||||
)
|
||||
alttext_api_call_btn = gr.Button(
|
||||
"Alt Text LLM Assessment",
|
||||
variant="secondary",
|
||||
interactive=False,
|
||||
)
|
||||
|
||||
with gr.Row():
|
||||
|
||||
image_info_output = gr.Textbox(label="Original alt-text", lines=5)
|
||||
alttext_info_output = gr.Textbox(label="LLM Assessment", lines=5)
|
||||
|
||||
with gr.Row():
|
||||
|
||||
gallery_html = gr.HTML(label="Image Gallery")
|
||||
|
||||
image_extraction_api_call_btn.click(
|
||||
fn=lambda: ("", "", "", gr.Button(interactive=False)),
|
||||
inputs=[],
|
||||
outputs=[
|
||||
image_info_output,
|
||||
gallery_html,
|
||||
alttext_info_output,
|
||||
alttext_api_call_btn,
|
||||
],
|
||||
).then(
|
||||
make_image_extraction_api_call,
|
||||
inputs=[url_input],
|
||||
outputs=[image_info_output, gallery_html],
|
||||
).then(
|
||||
fn=lambda: gr.Button(interactive=True),
|
||||
inputs=[],
|
||||
outputs=[alttext_api_call_btn],
|
||||
)
|
||||
|
||||
# Process selected images with JavaScript
|
||||
|
||||
alttext_api_call_btn.click(
|
||||
fn=make_alttext_llm_assessment_api_call,
|
||||
inputs=[url_input, gallery_html],
|
||||
outputs=[alttext_info_output],
|
||||
js="""
|
||||
(url_input,gallery_html) => {
|
||||
const checkboxes = document.querySelectorAll('.image-checkbox:checked');
|
||||
if (checkboxes.length === 0) {
|
||||
alert('Please select at least one image!');
|
||||
return [url_input,JSON.stringify([])];
|
||||
}
|
||||
if (checkboxes.length > 3) {
|
||||
alert('Please select maximum 3 images!');
|
||||
return [url_input,JSON.stringify([])];
|
||||
}
|
||||
const selectedData = [];
|
||||
|
||||
checkboxes.forEach(checkbox => {
|
||||
const index = checkbox.dataset.index;
|
||||
const imageUrl = checkbox.dataset.imgurl;
|
||||
const originalAlt = document.querySelector('.original-alt[data-index="' + index + '"]').value;
|
||||
const assessment = document.querySelector('.assessment-range[data-index="' + index + '"]').value;
|
||||
const newAltText = document.querySelector('.new-alt-text[data-index="' + index + '"]').value;
|
||||
|
||||
selectedData.push({
|
||||
image_url: imageUrl,
|
||||
original_alt_text: originalAlt,
|
||||
assessment: parseInt(assessment),
|
||||
new_alt_text: newAltText
|
||||
});
|
||||
});
|
||||
|
||||
return [url_input,JSON.stringify(selectedData)];
|
||||
}
|
||||
""",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# connection_db = db_persistence_startup(table="wcag_user_assessments")
|
||||
demo.launch()
|
||||
|
|
@ -8,6 +8,8 @@ import argparse
|
|||
from dependences.utils import disclaim_bool_string, prepare_output_folder, create_folder
|
||||
import requests
|
||||
import os
|
||||
import urllib.parse
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ImageExtractor:
|
||||
|
|
@ -53,29 +55,51 @@ class ImageExtractor:
|
|||
# Also check query parameters (e.g., format=jpeg)
|
||||
return any(fmt in img_url.lower() for fmt in self.SUPPORTED_FORMATS)
|
||||
|
||||
|
||||
async def _download_image(self, image_url, output_dir="images") -> None:
|
||||
|
||||
# Parse the URL to get the path without query parameters
|
||||
parsed_url = urllib.parse.urlparse(image_url)
|
||||
url_path = parsed_url.path
|
||||
|
||||
# Get the filename from the path
|
||||
filename = url_path.split("/")[-1]
|
||||
|
||||
# Split filename and extension
|
||||
if "." in filename:
|
||||
image_name, ext = filename.rsplit(".", 1)
|
||||
ext = ext.lower()
|
||||
else:
|
||||
image_name = filename
|
||||
ext = "jpg"
|
||||
|
||||
# Validate extension
|
||||
if ext not in ["jpg", "jpeg", "png", "gif", "webp"]:
|
||||
ext = "jpg"
|
||||
|
||||
# Sanitize image name (remove special characters, limit length)
|
||||
image_name = "".join(c for c in image_name if c.isalnum() or c in ("-", "_"))
|
||||
image_name = image_name[:200] # Limit filename length
|
||||
|
||||
# If name is empty after sanitization, create a hash-based name
|
||||
if not image_name:
|
||||
import hashlib
|
||||
|
||||
image_name = hashlib.md5(image_url.encode()).hexdigest()[:16]
|
||||
|
||||
# Download the image
|
||||
print("getting image:", image_url)
|
||||
response = requests.get(image_url, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
try:
|
||||
# Get file extension from URL
|
||||
ext = image_url.split(".")[-1].split("?")[0]
|
||||
image_name = image_url.split("/")[-1][0 : -len(ext) - 1]
|
||||
|
||||
if ext not in ["jpg", "jpeg", "png", "gif", "webp"]:
|
||||
ext = "jpg"
|
||||
|
||||
# Download the image
|
||||
print("getting image:", image_url)
|
||||
response = requests.get(image_url, timeout=10)
|
||||
response.raise_for_status()
|
||||
|
||||
# Save the image
|
||||
output_path = os.path.join(output_dir, f"{image_name}.{ext}")
|
||||
with open(output_path, "wb") as f:
|
||||
f.write(response.content)
|
||||
|
||||
print(f"Saved: {output_path}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error downloading {image_url}: {e}")
|
||||
print(f"Error saving image {image_url}: {e}")
|
||||
|
||||
async def save_elaboration(self, images, output_dir) -> None:
|
||||
with open(output_dir, "w", encoding="utf-8") as f:
|
||||
|
|
@ -306,7 +330,9 @@ class ImageExtractor:
|
|||
|
||||
return metadata
|
||||
|
||||
async def extract_images(self, specific_images_urls=[]) -> List[Dict]:
|
||||
async def extract_images(
|
||||
self, extract_context=True, specific_images_urls=[]
|
||||
) -> List[Dict]:
|
||||
"""
|
||||
Extract all images from the page with their metadata and context.
|
||||
|
||||
|
|
@ -318,40 +344,59 @@ class ImageExtractor:
|
|||
page = await browser.new_page()
|
||||
|
||||
try:
|
||||
# await page.goto(self.url, wait_until='networkidle')#original
|
||||
|
||||
# ---alternative
|
||||
#await page.goto(self.url, wait_until="networkidle") # method 1: use if the page has unpredictable async content and there is the need to ensure everything loads
|
||||
# The "networkidle" approach is generally more robust but slower, while the fixed timeout is faster but less adaptive to actual page behavior.
|
||||
# ---alternative method2: use if there is total awareness of the page's loading pattern and want faster, more reliable execution
|
||||
await page.goto(self.url, wait_until="load")
|
||||
# Wait for page to load completely
|
||||
await page.wait_for_timeout(2000) # Wait for dynamic content
|
||||
# -----
|
||||
|
||||
# Get page metadata once
|
||||
page_metadata = await self._get_page_metadata(page)
|
||||
if extract_context:
|
||||
# Get page metadata once
|
||||
page_metadata = await self._get_page_metadata(page)
|
||||
page_title = page_metadata["title"]
|
||||
page_description = page_metadata["description"]
|
||||
page_keywords = page_metadata["keywords"]
|
||||
page_headings = page_metadata["headings"]
|
||||
else:
|
||||
page_title = ""
|
||||
page_description = ""
|
||||
page_keywords = ""
|
||||
page_headings = []
|
||||
|
||||
if len(specific_images_urls) == 0:
|
||||
# Find all img elements
|
||||
print("Extracting all images from the page")
|
||||
print("Extracting all images from the page",self.url)
|
||||
img_elements = await page.locator("img").all()
|
||||
else:
|
||||
print("Extracting specific images from the page:", specific_images_urls)
|
||||
print(
|
||||
"Extracting specific images from the page:",
|
||||
self.url,
|
||||
specific_images_urls,
|
||||
)
|
||||
img_elements = []
|
||||
for url in specific_images_urls:
|
||||
try:
|
||||
img_element = await page.locator(
|
||||
f'img[src="{url}"]'
|
||||
).first.element_handle() # Use first() to get only the first match
|
||||
).first.element_handle() # Use first() to get only the first match
|
||||
if img_element:
|
||||
img_elements.append(img_element)
|
||||
except Exception as e:
|
||||
print(f"Error locating image with src {url}: {str(e)}")
|
||||
print(f"Error locating image with src {url}: {str(e)}")
|
||||
|
||||
image_source_list = [] # avoid multiple check for the same image url
|
||||
images_data = []
|
||||
|
||||
for img in img_elements:
|
||||
if len(images_data) >= self.number_of_images: # limits the effective image list based on the ini param.
|
||||
print("Reached the maximum number of images to extract.",self.number_of_images)
|
||||
for img in img_elements:
|
||||
if (
|
||||
len(images_data) >= self.number_of_images
|
||||
): # limits the effective image list based on the ini param.
|
||||
print(
|
||||
"Reached the maximum number of images to extract.",
|
||||
self.number_of_images,
|
||||
)
|
||||
break
|
||||
try:
|
||||
# Get image src
|
||||
|
|
@ -373,7 +418,9 @@ class ImageExtractor:
|
|||
# Verify format
|
||||
if not self._is_supported_format(img_url):
|
||||
print(
|
||||
"image format not supported for url:", img_url, ". Skipped."
|
||||
"image format not supported for url:",
|
||||
img_url,
|
||||
". Skipped.",
|
||||
)
|
||||
continue
|
||||
|
||||
|
|
@ -386,10 +433,13 @@ class ImageExtractor:
|
|||
# Get alt text
|
||||
alt_text = await img.get_attribute("alt") or ""
|
||||
|
||||
# Get surrounding HTML context (full, immediate, and nearby)
|
||||
html_context, immediate_context, nearby_text = (
|
||||
await self._get_element_context(page, img)
|
||||
)
|
||||
if extract_context:
|
||||
# Get surrounding HTML context (full, immediate, and nearby)
|
||||
html_context, immediate_context, nearby_text = (
|
||||
await self._get_element_context(page, img)
|
||||
)
|
||||
else:
|
||||
html_context, immediate_context, nearby_text = "", "", ""
|
||||
|
||||
# Compile image data
|
||||
image_info = {
|
||||
|
|
@ -399,10 +449,10 @@ class ImageExtractor:
|
|||
"immediate_context": immediate_context,
|
||||
"nearby_text": nearby_text,
|
||||
"page_url": self.url,
|
||||
"page_title": page_metadata["title"],
|
||||
"page_description": page_metadata["description"],
|
||||
"page_keywords": page_metadata["keywords"],
|
||||
"page_headings": page_metadata["headings"],
|
||||
"page_title": page_title,
|
||||
"page_description": page_description,
|
||||
"page_keywords": page_keywords,
|
||||
"page_headings": page_headings,
|
||||
}
|
||||
|
||||
images_data.append(image_info)
|
||||
|
|
|
|||
|
|
@ -20,11 +20,15 @@ class MLLMManager:
|
|||
response = call_API_urlibrequest(
|
||||
url=self.end_point, headers=headers, data=payload
|
||||
)
|
||||
try:
|
||||
if openai_model:
|
||||
model_response = response["choices"][0]["message"]["content"]
|
||||
else:
|
||||
model_response = response["message"]["content"]
|
||||
|
||||
if openai_model:
|
||||
model_response = response["choices"][0]["message"]["content"]
|
||||
else:
|
||||
model_response = response["message"]["content"]
|
||||
except Exception as e:
|
||||
print("Error getting model response:", e)
|
||||
model_response = {}
|
||||
|
||||
return model_response
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,83 @@
|
|||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
import logging
|
||||
from pydantic import BaseModel
|
||||
import json
|
||||
from typing import Dict, List
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from dependences.utils import (
|
||||
disclaim_bool_string,
|
||||
prepare_output_folder,
|
||||
create_folder,
|
||||
db_persistence_insert,
|
||||
)
|
||||
from dependences.image_extractor import ImageExtractor
|
||||
from dependences.mllm_management import MLLMManager, parse_mllm_alt_text_response
|
||||
|
||||
invalid_json_input_msg = "Invalid JSON format"
|
||||
unexpected_error_msg = "Unexpected Error: could not end the process"
|
||||
|
||||
|
||||
class ExtractImages(BaseModel):
|
||||
page_url: str = "https://www.bbc.com"
|
||||
number_of_images: int = 10
|
||||
|
||||
|
||||
class ExtractImagesRoutes:
|
||||
|
||||
def __init__(self):
|
||||
|
||||
self.router = APIRouter()
|
||||
|
||||
self.router.add_api_route(
|
||||
"/extract_images",
|
||||
self.extract_images,
|
||||
methods=["POST"],
|
||||
tags=["Basic Elaboration"],
|
||||
description="extract images from a webpage",
|
||||
name="Extract images and context",
|
||||
dependencies=[],
|
||||
)
|
||||
|
||||
logging.info("extract images routes correctly initialized.")
|
||||
|
||||
async def extract_images(
|
||||
self, request: Request, data: ExtractImages
|
||||
) -> JSONResponse:
|
||||
"""Return the alt text validation assessment based on WCAG guidelines"""
|
||||
try:
|
||||
json_content = json.loads(data.model_dump_json())
|
||||
|
||||
# ---------------------
|
||||
|
||||
# Create extractor
|
||||
image_extractor = ImageExtractor(
|
||||
json_content["page_url"],
|
||||
context_levels=0,
|
||||
pixel_distance_threshold=0,
|
||||
number_of_images=json_content["number_of_images"],
|
||||
save_images="False",
|
||||
save_images_path="",
|
||||
)
|
||||
# Extract images
|
||||
logging.info(f"Extracting images from: {json_content['page_url']}")
|
||||
images = await image_extractor.extract_images(extract_context=False)
|
||||
|
||||
returned_object = {
|
||||
"images": images,
|
||||
}
|
||||
|
||||
return JSONResponse(content=returned_object, status_code=200)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
logging.error(invalid_json_input_msg)
|
||||
return JSONResponse(
|
||||
content={"error": invalid_json_input_msg}, status_code=400
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logging.error(unexpected_error_msg + " %s", e)
|
||||
return JSONResponse(
|
||||
content={"error": unexpected_error_msg}, status_code=500
|
||||
)
|
||||
|
|
@ -24,8 +24,8 @@ class WCAGAltTextValuation(BaseModel):
|
|||
context_levels: int = 5
|
||||
pixel_distance_threshold: int = 200
|
||||
number_of_images: int = 10
|
||||
save_images: bool = True
|
||||
save_elaboration: bool = True
|
||||
save_images: str = "True"
|
||||
save_elaboration: str = "True"
|
||||
specific_images_urls: List[str] = []
|
||||
|
||||
|
||||
|
|
@ -37,18 +37,18 @@ class WCAGAltTextValuationRoutes:
|
|||
self.router = APIRouter()
|
||||
|
||||
self.router.add_api_route(
|
||||
"/wgag_alttext_validation",
|
||||
self.wgag_alttext_validation,
|
||||
"/wcag_alttext_validation",
|
||||
self.wcag_alttext_validation,
|
||||
methods=["POST"],
|
||||
tags=["Wcag Alt Text Validation"],
|
||||
description="WCAG validator alt_text validation",
|
||||
name="wgag alttext validation",
|
||||
name="wcag alttext validation",
|
||||
dependencies=[],
|
||||
)
|
||||
|
||||
logging.info("wcag alttext routes correctly initialized.")
|
||||
|
||||
async def wgag_alttext_validation(
|
||||
async def wcag_alttext_validation(
|
||||
self, request: Request, data: WCAGAltTextValuation
|
||||
) -> JSONResponse:
|
||||
"""Return the alt text validation assessment based on WCAG guidelines"""
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@
|
|||
"name": "stderr",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"C:\\Users\\nicola\\AppData\\Local\\Temp\\ipykernel_14420\\1344219625.py:6: DeprecationWarning: __array__ implementation doesn't accept a copy keyword, so passing copy=False failed. __array__ must implement 'dtype' and 'copy' keyword arguments. To learn more, see the migration guide https://numpy.org/devdocs/numpy_2_0_migration_guide.html#adapting-to-changes-in-the-copy-keyword\n",
|
||||
"C:\\Users\\nicola\\AppData\\Local\\Temp\\ipykernel_20916\\1344219625.py:6: DeprecationWarning: __array__ implementation doesn't accept a copy keyword, so passing copy=False failed. __array__ must implement 'dtype' and 'copy' keyword arguments. To learn more, see the migration guide https://numpy.org/devdocs/numpy_2_0_migration_guide.html#adapting-to-changes-in-the-copy-keyword\n",
|
||||
" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))\n"
|
||||
]
|
||||
}
|
||||
|
|
@ -931,7 +931,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 11,
|
||||
"execution_count": 12,
|
||||
"id": "3992a33f",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -942,7 +942,7 @@
|
|||
" \"Damaged homes and wasteland in Pokrovsk, Ukraine with smoke rising, highlighting war's impact on the city.\")"
|
||||
]
|
||||
},
|
||||
"execution_count": 11,
|
||||
"execution_count": 12,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
@ -963,7 +963,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 12,
|
||||
"execution_count": 13,
|
||||
"id": "c1dad7b8",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -984,7 +984,7 @@
|
|||
"name": "stderr",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"C:\\Users\\nicola\\AppData\\Local\\Temp\\ipykernel_14420\\1344219625.py:6: DeprecationWarning: __array__ implementation doesn't accept a copy keyword, so passing copy=False failed. __array__ must implement 'dtype' and 'copy' keyword arguments. To learn more, see the migration guide https://numpy.org/devdocs/numpy_2_0_migration_guide.html#adapting-to-changes-in-the-copy-keyword\n",
|
||||
"C:\\Users\\nicola\\AppData\\Local\\Temp\\ipykernel_20916\\1344219625.py:6: DeprecationWarning: __array__ implementation doesn't accept a copy keyword, so passing copy=False failed. __array__ must implement 'dtype' and 'copy' keyword arguments. To learn more, see the migration guide https://numpy.org/devdocs/numpy_2_0_migration_guide.html#adapting-to-changes-in-the-copy-keyword\n",
|
||||
" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))\n"
|
||||
]
|
||||
},
|
||||
|
|
@ -994,7 +994,7 @@
|
|||
"np.float64(0.5812176442146302)"
|
||||
]
|
||||
},
|
||||
"execution_count": 12,
|
||||
"execution_count": 13,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
@ -1014,7 +1014,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 13,
|
||||
"execution_count": 14,
|
||||
"id": "1c2d1cff",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -1025,7 +1025,7 @@
|
|||
" [0.70703788, 1. ]])"
|
||||
]
|
||||
},
|
||||
"execution_count": 13,
|
||||
"execution_count": 14,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
@ -1094,7 +1094,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 16,
|
||||
"execution_count": 15,
|
||||
"id": "b6ff8518",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -1104,7 +1104,7 @@
|
|||
"(2, 768)"
|
||||
]
|
||||
},
|
||||
"execution_count": 16,
|
||||
"execution_count": 15,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
@ -1119,7 +1119,26 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 17,
|
||||
"execution_count": 32,
|
||||
"id": "6310f4b2",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"None\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"# per capire se usa default prompt_name per differenziare i task come modelli avanzati come gemma\n",
|
||||
"print(model.default_prompt_name )"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 16,
|
||||
"id": "2eb31bbb",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -1130,7 +1149,7 @@
|
|||
" [0.82111526, 1. ]], dtype=float32)"
|
||||
]
|
||||
},
|
||||
"execution_count": 17,
|
||||
"execution_count": 16,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
@ -1142,7 +1161,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 18,
|
||||
"execution_count": 17,
|
||||
"id": "93a846e4",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -1152,7 +1171,7 @@
|
|||
"np.float32(0.8211156)"
|
||||
]
|
||||
},
|
||||
"execution_count": 18,
|
||||
"execution_count": 17,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
@ -1164,7 +1183,7 @@
|
|||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 19,
|
||||
"execution_count": 18,
|
||||
"id": "a7cf3288",
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
|
|
@ -1176,7 +1195,7 @@
|
|||
" 'cosine')"
|
||||
]
|
||||
},
|
||||
"execution_count": 19,
|
||||
"execution_count": 18,
|
||||
"metadata": {},
|
||||
"output_type": "execute_result"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ from restserver.routers import (
|
|||
routes_health,
|
||||
routes_local_db,
|
||||
routes_wcag_alttext,
|
||||
routes_extract_images,
|
||||
)
|
||||
|
||||
from dependences.utils import (
|
||||
|
|
@ -44,10 +45,12 @@ def server(connection_db, mllm_settings):
|
|||
wcag_alttext_routes = routes_wcag_alttext.WCAGAltTextValuationRoutes(
|
||||
connection_db, mllm_settings
|
||||
)
|
||||
extract_images_routes = routes_extract_images.ExtractImagesRoutes()
|
||||
|
||||
app.include_router(health_routes.router, prefix="")
|
||||
app.include_router(local_db_routes.router, prefix="")
|
||||
app.include_router(wcag_alttext_routes.router, prefix="")
|
||||
app.include_router(extract_images_routes.router, prefix="")
|
||||
return app
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue