double LLM assessments and UI update
This commit is contained in:
parent
f81c5aad2b
commit
3d0936448b
|
|
@ -31,25 +31,26 @@ import sqlite3
|
|||
WCAG_VALIDATOR_RESTSERVER_HEADERS = [("Content-Type", "application/json")]
|
||||
|
||||
|
||||
def process_dataframe(db_path, url, updated_df, user_state={}):
|
||||
def process_dataframe(db_path, url, updated_df, user_state={},llm_response_output={}):
|
||||
|
||||
print("Processing dataframe to adjust columns...")
|
||||
column_rating_name = "User Assessment for LLM Proposal"
|
||||
for column_rating_name in ["User Assessment for LLM Proposal 1", "User Assessment for LLM Proposal 2"]:
|
||||
|
||||
# Get the assessment column
|
||||
try:
|
||||
updated_df[column_rating_name] = updated_df[column_rating_name].astype(int)
|
||||
except ValueError:
|
||||
return "Error: User Assessment for LLM Proposal must be an integer"
|
||||
# Get the assessment column
|
||||
try:
|
||||
updated_df[column_rating_name] = updated_df[column_rating_name].astype(int)
|
||||
except ValueError:
|
||||
return "Error: User Assessment for LLM Proposal must be an integer"
|
||||
|
||||
if (updated_df[column_rating_name] < 1).any() or (
|
||||
updated_df[column_rating_name] > 5
|
||||
).any():
|
||||
return "Error: User Assessment for LLM Proposal must be between 1 and 5"
|
||||
if (updated_df[column_rating_name] < 1).any() or (
|
||||
updated_df[column_rating_name] > 5
|
||||
).any():
|
||||
return "Error: User Assessment for LLM Proposal must be between 1 and 5"
|
||||
|
||||
dataframe_json = updated_df.to_json(orient="records")
|
||||
connection_db = sqlite3.connect(db_path)
|
||||
json_user_str = json.dumps({"username": user_state["username"]}, ensure_ascii=False)
|
||||
lm_response_output_str = json.dumps(llm_response_output, ensure_ascii=False) #recuperato dalla chiamata all'llm, ho tutte le info anche sulle immagini
|
||||
try:
|
||||
# insert after everything to keep datetime aligned
|
||||
db_persistence_insert(
|
||||
|
|
@ -58,8 +59,8 @@ def process_dataframe(db_path, url, updated_df, user_state={}):
|
|||
page_url=url,
|
||||
user=json_user_str,
|
||||
llm_model="",
|
||||
json_in_str=dataframe_json, # to improve
|
||||
json_out_str="done via UI",
|
||||
json_in_str=lm_response_output_str,#dataframe_json, # to improve
|
||||
json_out_str=dataframe_json,
|
||||
table="wcag_user_assessments",
|
||||
)
|
||||
except Exception as e:
|
||||
|
|
@ -263,33 +264,93 @@ def load_llm_assessment_from_json(json_input):
|
|||
print("no mllm_validations found")
|
||||
return pd.DataFrame()
|
||||
|
||||
info_text = f"Assessment done on {len(data['mllm_validations']['mllm_alttext_assessments'])} image(s)\n\n"
|
||||
print(
|
||||
f"Assessment done on {len(data['mllm_validations']['mllm_alttext_assessments'])} image(s)"
|
||||
)
|
||||
if (
|
||||
data["mllm_validations"]["mllm_alttext_assessments"].get("mllm_alttext_assessments_openai")
|
||||
|
||||
and data["mllm_validations"]["mllm_alttext_assessments"].get("mllm_alttext_assessments_local")
|
||||
|
||||
):
|
||||
|
||||
is_single_model_output = False
|
||||
info_text = f"Assessment done by {len(data['mllm_validations']['mllm_alttext_assessments'])} models on {len(data['mllm_validations']['mllm_alttext_assessments']['mllm_alttext_assessments_openai'])} image(s)\n\n"
|
||||
print(
|
||||
f"The response contains multiple models output. Assessment done by {len(data['mllm_validations']['mllm_alttext_assessments'])} models on {len(data['mllm_validations']['mllm_alttext_assessments']['mllm_alttext_assessments_openai'])} image(s)"
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
is_single_model_output = True
|
||||
info_text = f"Assessment done on {len(data['mllm_validations']['mllm_alttext_assessments'])} image(s)\n\n"
|
||||
print(
|
||||
f"The response contains only one output. Assessment done on {len(data['mllm_validations']['mllm_alttext_assessments'])} image(s)"
|
||||
)
|
||||
|
||||
data_frame = []
|
||||
for idx, img_data in enumerate(
|
||||
data["mllm_validations"]["mllm_alttext_assessments"], 1
|
||||
):
|
||||
if is_single_model_output:
|
||||
for idx, img_data in enumerate(
|
||||
data["mllm_validations"]["mllm_alttext_assessments"], 1
|
||||
):
|
||||
|
||||
original_alt_text_assessment = img_data["mllm_response"].get(
|
||||
"original_alt_text_assessment", "No description"
|
||||
)
|
||||
new_alt_text = img_data["mllm_response"].get(
|
||||
"new_alt_text", "No description"
|
||||
)
|
||||
alt_text_original = img_data.get("alt_text", "No alt_text provided")
|
||||
original_alt_text_assessment = img_data["mllm_response"].get(
|
||||
"original_alt_text_assessment", "No description"
|
||||
)
|
||||
new_alt_text = img_data["mllm_response"].get(
|
||||
"new_alt_text", "No description"
|
||||
)
|
||||
alt_text_original = img_data.get("alt_text", "No alt_text provided")
|
||||
|
||||
data_frame.append(
|
||||
{
|
||||
"Original Alt Text": alt_text_original,
|
||||
"LLM Assessment": original_alt_text_assessment,
|
||||
"LLM Proposed Alt Text": new_alt_text,
|
||||
}
|
||||
)
|
||||
data_frame.append(
|
||||
{
|
||||
"Original Alt Text": alt_text_original,
|
||||
"LLM Assessment": original_alt_text_assessment,
|
||||
"LLM Proposed Alt Text": new_alt_text,
|
||||
}
|
||||
)
|
||||
else:
|
||||
|
||||
for idx, img_data in enumerate(
|
||||
data["mllm_validations"]["mllm_alttext_assessments"]["mllm_alttext_assessments_openai"], 1
|
||||
):
|
||||
|
||||
original_alt_text_assessment = img_data["mllm_response"].get(
|
||||
"original_alt_text_assessment", "No description"
|
||||
)
|
||||
new_alt_text = img_data["mllm_response"].get(
|
||||
"new_alt_text", "No description"
|
||||
)
|
||||
alt_text_original = img_data.get("alt_text", "No alt_text provided")
|
||||
|
||||
"""data_frame.append(
|
||||
{
|
||||
"Original Alt Text": alt_text_original,
|
||||
"LLM Assessment": original_alt_text_assessment,
|
||||
"LLM Proposed Alt Text": new_alt_text,
|
||||
}
|
||||
)"""
|
||||
#for idx, img_data in enumerate(
|
||||
# data["mllm_validations"]["mllm_alttext_assessments"]["mllm_alttext_assessments_local"], 1
|
||||
#):
|
||||
img_data_local = data["mllm_validations"]["mllm_alttext_assessments"]["mllm_alttext_assessments_local"][idx-1]
|
||||
original_alt_text_assessment_local = img_data_local["mllm_response"].get(
|
||||
"original_alt_text_assessment", "No description"
|
||||
)
|
||||
new_alt_text_local = img_data_local["mllm_response"].get(
|
||||
"new_alt_text", "No description"
|
||||
)
|
||||
#alt_text_original = img_data.get("alt_text", "No alt_text provided")
|
||||
|
||||
data_frame.append(
|
||||
{
|
||||
"Original Alt Text": alt_text_original,
|
||||
"LLM Assessment 1": original_alt_text_assessment,
|
||||
"LLM Proposed Alt Text 1": new_alt_text,
|
||||
"LLM Assessment 2": original_alt_text_assessment_local,
|
||||
"LLM Proposed Alt Text 2": new_alt_text_local,
|
||||
}
|
||||
)
|
||||
|
||||
df = pd.DataFrame(data_frame)
|
||||
|
||||
return df
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
|
|
@ -326,7 +387,9 @@ def make_alttext_llm_assessment_api_call(
|
|||
user_assessments = []
|
||||
user_new_alt_texts = []
|
||||
selected_image_id = []
|
||||
user_assessments_llm_proposal = []
|
||||
user_assessments_llm_proposal_1 = []
|
||||
user_assessments_llm_proposal_2 = []
|
||||
|
||||
for img in selected_images:
|
||||
selected_urls.append(img["image_url"])
|
||||
selected_alt_text_original.append(img["original_alt_text"])
|
||||
|
|
@ -335,7 +398,8 @@ def make_alttext_llm_assessment_api_call(
|
|||
selected_image_id.append(
|
||||
int(img["image_index"]) + 1
|
||||
) # add the id selected (+1 for index alignment)
|
||||
user_assessments_llm_proposal.append(3) # default value for now
|
||||
user_assessments_llm_proposal_1.append(3) # default value for now
|
||||
user_assessments_llm_proposal_2.append(3) # default value for now
|
||||
json_in_str["images_urls"] = selected_urls
|
||||
json_in_str["images_alt_text_original"] = selected_alt_text_original
|
||||
json_out_str["user_assessments"] = user_assessments
|
||||
|
|
@ -363,6 +427,7 @@ def make_alttext_llm_assessment_api_call(
|
|||
)
|
||||
# return response
|
||||
info_dataframe = load_llm_assessment_from_json(response)
|
||||
#print("info_dataframe:", info_dataframe)
|
||||
|
||||
# add the UI ids and other fields to to api response
|
||||
info_dataframe.insert(
|
||||
|
|
@ -371,9 +436,13 @@ def make_alttext_llm_assessment_api_call(
|
|||
info_dataframe.insert(2, "User Assessment", user_assessments)
|
||||
|
||||
info_dataframe.insert(3, "User Proposed Alt Text", user_new_alt_texts)
|
||||
info_dataframe["User Assessment for LLM Proposal"] = (
|
||||
user_assessments_llm_proposal
|
||||
info_dataframe["User Assessment for LLM Proposal 1"] = (
|
||||
user_assessments_llm_proposal_1
|
||||
)
|
||||
info_dataframe["User Assessment for LLM Proposal 2"] = (
|
||||
user_assessments_llm_proposal_2
|
||||
)
|
||||
print("info_dataframe after adding user assessments:", info_dataframe)
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
|
@ -395,7 +464,7 @@ def make_alttext_llm_assessment_api_call(
|
|||
finally:
|
||||
if connection_db:
|
||||
connection_db.close()
|
||||
return "LLM assessment completed", info_dataframe
|
||||
return "LLM assessment completed", info_dataframe, response
|
||||
|
||||
|
||||
def make_image_extraction_api_call(
|
||||
|
|
@ -546,9 +615,12 @@ with gr.Blocks(theme=gr.themes.Glass(), title="WCAG AI Validator") as demo:
|
|||
"Original Alt Text",
|
||||
"User Assessment",
|
||||
"User Proposed Alt Text",
|
||||
"LLM Assessment",
|
||||
"LLM Proposed Alt Text",
|
||||
"User Assessment for LLM Proposal",
|
||||
"LLM Assessment 1",
|
||||
"LLM Proposed Alt Text 1",
|
||||
"User Assessment for LLM Proposal 1",
|
||||
"LLM Assessment 2",
|
||||
"LLM Proposed Alt Text 2",
|
||||
"User Assessment for LLM Proposal 2",
|
||||
],
|
||||
label="LLM Assessment Results",
|
||||
wrap=True, # Wrap text in cells
|
||||
|
|
@ -566,6 +638,8 @@ with gr.Blocks(theme=gr.themes.Glass(), title="WCAG AI Validator") as demo:
|
|||
"ℹ Info: to assess the LLM output, only the values for the 'User Assessment for LLM Proposal' column need to be changed."
|
||||
)
|
||||
|
||||
llm_response_output=gr.JSON()
|
||||
|
||||
with gr.Row():
|
||||
|
||||
gallery_html = gr.HTML(label="Image Gallery")
|
||||
|
|
@ -600,7 +674,7 @@ with gr.Blocks(theme=gr.themes.Glass(), title="WCAG AI Validator") as demo:
|
|||
wcag_rest_server_url_state,
|
||||
user_state,
|
||||
],
|
||||
outputs=[image_info_output, alttext_info_output],
|
||||
outputs=[image_info_output, alttext_info_output,llm_response_output],
|
||||
js="""
|
||||
(url_input,gallery_html) => {
|
||||
const checkboxes = document.querySelectorAll('.image-checkbox:checked');
|
||||
|
|
@ -642,7 +716,7 @@ with gr.Blocks(theme=gr.themes.Glass(), title="WCAG AI Validator") as demo:
|
|||
|
||||
save_user_assessment_btn.click(
|
||||
fn=process_dataframe,
|
||||
inputs=[db_path_state, url_input, alttext_info_output, user_state],
|
||||
inputs=[db_path_state, url_input, alttext_info_output, user_state,llm_response_output],
|
||||
outputs=[image_info_output],
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -17,129 +17,21 @@ class LanguageExtractor:
|
|||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
short_segments_length_threshold: int = 30,
|
||||
max_total_length: int = 15000,
|
||||
):
|
||||
|
||||
self.url = url
|
||||
self.short_segments_length_threshold = short_segments_length_threshold
|
||||
self.max_total_length = max_total_length
|
||||
|
||||
|
||||
async def extract_languages(self, extract_context=True) -> Dict:
|
||||
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
page = await browser.new_page()
|
||||
|
||||
try:
|
||||
#await page.goto(self.url, timeout=50000, wait_until="load")
|
||||
#await page.wait_for_timeout(2000)
|
||||
await page.goto(self.url, timeout=50000, wait_until="domcontentloaded")# faster in this case, we just need the DOM to be loaded, not necessarily all the resources
|
||||
|
||||
lang_only_elements = []
|
||||
lang_and_xml_lang_elements = []
|
||||
|
||||
# Extract the lang attribute of the <html> tag
|
||||
html_tag = page.locator('html')
|
||||
html_tag_lang = await html_tag.get_attribute('lang')
|
||||
html_tag_xml_lang = await html_tag.get_attribute('xml:lang')
|
||||
|
||||
if html_tag_lang and html_tag_xml_lang:
|
||||
lang_and_xml_lang_elements.append(
|
||||
f'<html lang="{html_tag_lang}" xml:lang="{html_tag_xml_lang}"></html>'
|
||||
)
|
||||
elif html_tag_lang:
|
||||
lang_only_elements.append(f'<html lang="{html_tag_lang}"></html>')
|
||||
|
||||
# Find all elements with the lang attribute (excluding <html>)
|
||||
elements_with_lang = await page.locator('//*[@lang and not(self::html)]').all()
|
||||
|
||||
for element in elements_with_lang:
|
||||
outer_html = await element.evaluate('el => el.outerHTML')
|
||||
xml_lang = await element.get_attribute('xml:lang')
|
||||
if xml_lang:
|
||||
lang_and_xml_lang_elements.append(outer_html)
|
||||
else:
|
||||
lang_only_elements.append(outer_html)
|
||||
|
||||
return {
|
||||
"lang_only": "; ".join(lang_only_elements),
|
||||
"lang_and_xml": "; ".join(lang_and_xml_lang_elements)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting languages: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
finally:
|
||||
await browser.close()
|
||||
|
||||
|
||||
|
||||
"""
|
||||
## quella da nodejs
|
||||
from playwright.async_api import Page
|
||||
|
||||
async def h58(page: Page):
|
||||
results = []
|
||||
|
||||
try:
|
||||
print("Identifying the main language of the page...")
|
||||
# Identify the main language of the page
|
||||
main_lang = "The main language of the page is: not specified"
|
||||
try:
|
||||
# Playwright uses locator() or query_selector()
|
||||
html_element = page.locator('html')
|
||||
lang_attribute = await html_element.get_attribute('lang')
|
||||
if lang_attribute:
|
||||
main_lang = f"The main language of the page is: {lang_attribute}"
|
||||
except Exception as e:
|
||||
print(f"Error identifying main language: {e}")
|
||||
|
||||
print("Find all elements containing text")
|
||||
# Find all elements containing text that don't have children (leaf nodes)
|
||||
try:
|
||||
# Playwright handles XPaths directly through the locator API
|
||||
elements = await page.locator('//*[text() and not(*)]').all()
|
||||
except Exception as e:
|
||||
print(f"Error finding text elements: {e}")
|
||||
return results
|
||||
|
||||
print("Create a string to collect the outer html of all the elements containing text...")
|
||||
all_outer_html = ""
|
||||
|
||||
for element in elements:
|
||||
try:
|
||||
# Get the tag name
|
||||
tag_name = await element.evaluate("el => el.tagName.toLowerCase()")
|
||||
|
||||
# Skip <html>, <style> and <script> elements
|
||||
if tag_name in ['html', 'style', 'script']:
|
||||
continue
|
||||
|
||||
# Get the outerHTML
|
||||
html_content = await element.evaluate("el => el.outerHTML")
|
||||
all_outer_html += html_content
|
||||
|
||||
# Truncate at 15,000 characters to save tokens
|
||||
if len(all_outer_html) > 15000:
|
||||
all_outer_html = all_outer_html[:15000] + "(...continues)"
|
||||
break # Stop processing once limit is reached to save time
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing element: {e}")
|
||||
|
||||
# You can append the final result to your results list here
|
||||
results.append({"main_lang": main_lang, "content": all_outer_html})
|
||||
|
||||
except Exception as e:
|
||||
print(f"Unexpected error: {e}")
|
||||
|
||||
return results
|
||||
"""
|
||||
async def extract_content_with_lang_context(self) -> Dict:
|
||||
"""
|
||||
The verification is:
|
||||
Read through all the text content on the page and identify any passages that are in a different language than the page default
|
||||
Then check whether those passages have a lang attribute marking them correctly as being in a different language.
|
||||
If a language change exists in the text but no lang attribute is present → that's a failure of H58"""
|
||||
If a language change exists in the text but no lang attribute is present → that's a failure of H58
|
||||
"""
|
||||
|
||||
async with async_playwright() as p:
|
||||
# Efficiently launch and manage the browser lifecycle
|
||||
|
|
@ -148,9 +40,10 @@ class LanguageExtractor:
|
|||
page = await context.new_page()
|
||||
|
||||
results = {
|
||||
"page_url": self.url,
|
||||
"main_page_lang": "not specified",
|
||||
"extracted_segments": [],
|
||||
"total_char_count": 0
|
||||
"total_char_count": 0,
|
||||
}
|
||||
|
||||
try:
|
||||
|
|
@ -158,41 +51,50 @@ class LanguageExtractor:
|
|||
await page.goto(self.url, timeout=50000, wait_until="domcontentloaded")
|
||||
|
||||
# 1. Get Root Language (Global Context)
|
||||
html_tag = page.locator('html')
|
||||
root_lang = await html_tag.get_attribute('lang') or "unknown"
|
||||
html_tag = page.locator("html")
|
||||
root_lang = await html_tag.get_attribute("lang") or "unknown"
|
||||
results["page_url"]= self.url
|
||||
results["main_page_lang"] = root_lang
|
||||
|
||||
# 2. Find Leaf Nodes containing text (The H58 Logic)
|
||||
# We target elements with text but no child elements to get the 'cleanest' snippets
|
||||
elements = await page.locator('//*[text() and not(*)]').all()
|
||||
elements = await page.locator("//*[text() and not(*)]").all()
|
||||
|
||||
current_length = 0
|
||||
max_length = 15000
|
||||
max_length = self.max_total_length # only considers the text content, not the HTML tags
|
||||
|
||||
for element in elements:
|
||||
if current_length >= max_length:
|
||||
results["extracted_segments"].append("...[Truncated: Limit Reached]")
|
||||
results["extracted_segments"].append(
|
||||
"...[Truncated: Limit Reached]"
|
||||
)
|
||||
break
|
||||
|
||||
try:
|
||||
# Skip non-content tags
|
||||
tag_name = await element.evaluate("el => el.tagName.toLowerCase()")
|
||||
if tag_name in ['script', 'style', 'noscript', 'html']:
|
||||
tag_name = await element.evaluate(
|
||||
"el => el.tagName.toLowerCase()"
|
||||
)
|
||||
if tag_name in ["script", "style", "noscript", "html"]:
|
||||
continue
|
||||
|
||||
# Get local language context (The extract_languages logic)
|
||||
local_lang = await element.get_attribute('lang')
|
||||
#outer_html = await element.evaluate("el => el.outerHTML")
|
||||
local_lang = await element.get_attribute("lang")
|
||||
# outer_html = await element.evaluate("el => el.outerHTML")
|
||||
clean_text = await element.inner_text()
|
||||
clean_text = clean_text.strip()
|
||||
if not clean_text:
|
||||
if (
|
||||
not clean_text
|
||||
or len(clean_text) < self.short_segments_length_threshold
|
||||
): # Skip very short text which is unlikely to be meaningful for language detection
|
||||
# print(f"Skipping short text: '{clean_text}'")
|
||||
continue
|
||||
|
||||
# Package the data: Text + its specific language metadata
|
||||
segment = {
|
||||
"tag": tag_name,
|
||||
"lang": local_lang if local_lang else "inherited",
|
||||
"html": clean_text
|
||||
"html": clean_text,
|
||||
}
|
||||
|
||||
results["extracted_segments"].append(segment)
|
||||
|
|
@ -202,10 +104,13 @@ class LanguageExtractor:
|
|||
# Silently skip individual element errors to keep the loop moving
|
||||
continue
|
||||
|
||||
results["total_char_count"] = current_length
|
||||
results["total_char_count"] = (
|
||||
current_length # only considers the text content, not the HTML tags, to calculate the total character count of the extracted content
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
finally:
|
||||
await browser.close()
|
||||
await browser.close()
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ class MLLMManager:
|
|||
}
|
||||
return payload
|
||||
|
||||
# --------alt text evaluation specific methods ---------
|
||||
# --------all the system prompts ---------
|
||||
|
||||
def get_alt_text_system_prompt(self):
|
||||
|
||||
|
|
@ -147,7 +147,8 @@ class MLLMManager:
|
|||
7. Generate the new most appropriate alt-text given the context and the steps before. Keep this within 30 words. Use the same natural language (e.g., English, Spanish, Italian) as the original alt-text.
|
||||
|
||||
8. Here is the JSON format the results must have:
|
||||
{"Original alt-text assessment" : "*your original alt-text assessment*", "Assessment" : "*your assessment judgment*", "EvaluationResult": "*your response*", "New alt-text":"*new alt-text*"}"""
|
||||
```json{"Original alt-text assessment" : "*your original alt-text assessment*", "Assessment" : "*your assessment judgment*", "EvaluationResult": "*your response*", "New alt-text":"*new alt-text*"}```
|
||||
You MUST respond with ONLY a valid JSON array. No explanations, no comments, no markdown text outside the code block."""
|
||||
|
||||
return system_prompt
|
||||
|
||||
|
|
@ -181,7 +182,68 @@ class MLLMManager:
|
|||
5. Provide a brief reasoning for your judgment. Your response should be in English. Keep your response within 100 words.
|
||||
|
||||
6. Here is the JSON format the result must have:
|
||||
{"Assessment" : "*your assessment*", "Judgment" : "*your judgment*", "EvaluationResult": "*your response*"}"""
|
||||
```json{"Assessment" : "*your assessment*", "Judgment" : "*your judgment*", "EvaluationResult": "*your response*"}```
|
||||
You MUST respond with ONLY a valid JSON object. No explanations, no comments, no markdown text outside the code block."""
|
||||
|
||||
return system_prompt
|
||||
|
||||
def get_h58_system_prompt(self): #NB requires finetuning
|
||||
|
||||
# https://www.w3.org/WAI/WCAG22/Understanding/language-of-parts.html without examples
|
||||
|
||||
system_prompt = """You are a WCAG accessibility auditor specializing in multilingual content analysis. Your task is to evaluate whether a webpage correctly implements WCAG Technique H58: "Using language attributes to identify changes in the human language".
|
||||
Use the following explanation and examples to guide your evaluation:
|
||||
The human language of each passage or phrase in the content must be programmatically determined except for proper names,
|
||||
technical terms, words of indeterminate language, and words or phrases that have become part of the vernacular of the
|
||||
immediately surrounding text. The intent is to ensure that user agents can correctly present phrases, passages, and in some
|
||||
cases words written in multiple languages. This makes it possible for user agents and assistive technologies to present content
|
||||
according to the presentation and pronunciation rules for that language. Individual words or phrases in one language can become
|
||||
part of another language. For example, "rendezvous" is a French word that has been adopted in English, appears in English
|
||||
dictionaries, and is properly pronounced by English screen readers. Hence a passage of English text may contain the word
|
||||
"rendezvous" without specifying that its human language is French and still satisfy this Success Criterion.
|
||||
Most professions require frequent use of technical terms which may originate from a foreign language. Such terms are usually
|
||||
not translated to all languages. The universal nature of technical terms also facilitate communication between professionals.
|
||||
Some common examples of technical terms include: Homo sapiens, Alpha Centauri, hertz, and habeas corpus.
|
||||
|
||||
1. You will be provided with the following:
|
||||
- The page default language declared at the top level (main_page_lang).
|
||||
- Each extracted segment with its own lang attribute, or inherit the page default ("inherited").
|
||||
|
||||
2. You must assess whether any text segment contains content in a language DIFFERENT from the page default, and whether that difference is correctly marked with a lang attribute.
|
||||
|
||||
## Failure Condition (H58 violation):
|
||||
A segment FAILS if:
|
||||
- Its text content is in a different language than the page default
|
||||
- AND its lang is "inherited" or empty (i.e., no explicit lang override is present)
|
||||
In this case jour judgment should be 'failure' and your assessment should be 1 or 2 depending on the severity of the violation.
|
||||
|
||||
## Pass Condition:
|
||||
A segment PASSES if:
|
||||
- Its text is in the same language as the page default (lang override not required)
|
||||
- OR its text is in a different language AND a correct lang attribute is explicitly set
|
||||
In this case your judgment should be 'success' and your assessment should be 4 or 5 depending on the goodness of the alignment.
|
||||
|
||||
If you cannot determine with certainty, your judgment should be 'warning' and your assessment 3.
|
||||
|
||||
Analyze each segment's text content to detect its actual language. Compare it against the page default language. Flag any segment where a language change occurs but is not declared via an explicit lang attribute.
|
||||
|
||||
3. Include a brief reasoning for your judgment. Your response should be in English. Keep your response within 20 words.
|
||||
|
||||
4. Here is the format your response must have, which is an array of JSON objects, one for each segment analyzed:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"tag": "<html tag>",
|
||||
"html": "<content snippet>",
|
||||
"detected_lang": "<language you detected in the text>",
|
||||
"declared_lang": "<lang attribute value or 'inherited'>",
|
||||
"Assessment": "*your assessment*"
|
||||
"Judgment" : "*your judgment*"
|
||||
"EvaluationResult": *your response*
|
||||
}
|
||||
]```
|
||||
You MUST respond with ONLY a valid JSON array. No explanations, no comments, no markdown text outside the code block.
|
||||
"""
|
||||
|
||||
return system_prompt
|
||||
|
||||
|
|
@ -218,9 +280,10 @@ class MLLMManager:
|
|||
for text in texts:
|
||||
user_prompt = user_prompt + " " + text
|
||||
user_prompt = {"user_prompt": user_prompt}
|
||||
|
||||
|
||||
return user_prompt
|
||||
|
||||
# --- all the evaluation specific methods ---------
|
||||
def make_alt_text_evaluation(
|
||||
self,
|
||||
images,
|
||||
|
|
@ -282,26 +345,72 @@ class MLLMManager:
|
|||
mllm_responses.append(report)
|
||||
return mllm_responses
|
||||
|
||||
# --- end of alt text evaluation specific methods ---------
|
||||
# Helper method to keep the main logic clean
|
||||
def execute_llm_call(self, system_prompt, main_language, chunk, openai_model):
|
||||
extracted_segment_text = "Here are the segments of the page: " + str(chunk)
|
||||
user_prompt = self.get_standard_textual_user_prompt(
|
||||
texts=[main_language, extracted_segment_text],
|
||||
openai_model=openai_model,
|
||||
)
|
||||
return self.get_response(
|
||||
system_prompt=system_prompt,
|
||||
user_prompt=user_prompt,
|
||||
openai_model=openai_model,
|
||||
is_only_textual=True,
|
||||
)
|
||||
|
||||
def make_h58_evaluation(
|
||||
self,
|
||||
main_language,
|
||||
other_textual_elements,
|
||||
languages,
|
||||
number_of_segments=10,
|
||||
openai_model=False,
|
||||
):
|
||||
print("Using end_point:", self.end_point)
|
||||
print(
|
||||
"make_h58_evaluation - main_language:",
|
||||
main_language,
|
||||
"other_textual_elements:",
|
||||
other_textual_elements,
|
||||
|
||||
# call the llm multiple time in aggregated segments to avoid token limit issues and to give a more segment-specific evaluation
|
||||
|
||||
system_prompt = self.get_h58_system_prompt()
|
||||
|
||||
main_language = "The main language of the page is: " + str(
|
||||
languages["main_page_lang"]
|
||||
)
|
||||
|
||||
mllm_responses = []
|
||||
report = {
|
||||
"mllm_response": "",
|
||||
}
|
||||
mllm_responses.append(report)
|
||||
partial_segment_counter = 0
|
||||
segment_counter_aggregation = 5 # number of segments to aggregate in a single prompt to the MLLM, to give a more segment-specific evaluation and avoid token limit issues
|
||||
extracted_segment_chunk = ""
|
||||
|
||||
if number_of_segments == -1: # if we want to evaluate all the segments
|
||||
number_of_segments = len(languages["extracted_segments"])
|
||||
|
||||
for extracted_segment in languages["extracted_segments"][
|
||||
0 : number_of_segments
|
||||
]:
|
||||
partial_segment_counter += 1
|
||||
|
||||
extracted_segment_chunk = (
|
||||
extracted_segment_chunk + " " + str(extracted_segment)
|
||||
)
|
||||
|
||||
if partial_segment_counter == segment_counter_aggregation:
|
||||
|
||||
mllm_response = self.execute_llm_call(
|
||||
system_prompt, main_language, extracted_segment_chunk, openai_model
|
||||
)
|
||||
|
||||
mllm_responses.append({"mllm_response": mllm_response})
|
||||
|
||||
# Reset for next chunk
|
||||
extracted_segment_chunk = ""
|
||||
partial_segment_counter = 0
|
||||
|
||||
# --- LOGIC: Handle remaining segments ---
|
||||
if extracted_segment_chunk.strip():
|
||||
print("Processing remaining segments in the last chunk.")
|
||||
mllm_response = self.execute_llm_call(
|
||||
system_prompt, main_language, extracted_segment_chunk, openai_model
|
||||
)
|
||||
mllm_responses.append({"mllm_response": mllm_response})
|
||||
|
||||
return mllm_responses
|
||||
|
||||
def make_g88_evaluation(
|
||||
|
|
@ -311,7 +420,7 @@ class MLLMManager:
|
|||
):
|
||||
|
||||
system_prompt = self.get_g88_system_prompt()
|
||||
|
||||
|
||||
page_title = "The title of the page is: " + str(title_content["title"] + ". ")
|
||||
structural_content = (
|
||||
"Here is the content of the page (<main> tag, headings):"
|
||||
|
|
@ -412,7 +521,7 @@ def parse_mllm_alt_text_response(mllm_response):
|
|||
}
|
||||
|
||||
|
||||
def parse_mllm_standard_response(mllm_response):
|
||||
def parse_mllm_standard_response(mllm_response, extra_fields=[]):
|
||||
|
||||
try:
|
||||
# Handle NaN or None values
|
||||
|
|
@ -423,13 +532,30 @@ def parse_mllm_standard_response(mllm_response):
|
|||
"evaluation_result": None,
|
||||
}
|
||||
# Extract JSON content between ```json and ``` markers
|
||||
json_match = re.search(r"```json\s*(.*?)\s*```", mllm_response, re.DOTALL)
|
||||
# json_match = re.search(r"```json\s*(.*?)\s*```", mllm_response, re.DOTALL)
|
||||
json_match = re.search(
|
||||
r"```json\s*(.*?)(?:\s*```|$)", mllm_response, re.DOTALL
|
||||
) # more robust regex to handle cases where the closing ``` is missing or there are extra spaces/newlines after the JSON content
|
||||
|
||||
if not json_match:
|
||||
print(
|
||||
"MLLM response does not contain JSON code block. Trying to extract JSON content without code block markers as list."
|
||||
)
|
||||
# Try to find JSON without markdown code blocks
|
||||
json_match = re.search(r"\{.*\}", mllm_response, re.DOTALL)
|
||||
json_match = re.search(r"\[.*\]", mllm_response, re.DOTALL)
|
||||
print("new json_match:", json_match)
|
||||
else:
|
||||
print("MLLM response contains JSON code block. Extracting JSON content.")
|
||||
|
||||
if not json_match:
|
||||
print(
|
||||
"MLLM response does not contain JSON code block. Trying to extract JSON content without code block markers as {}."
|
||||
)
|
||||
json_match = re.search(r"\{.*\}", mllm_response, re.DOTALL)
|
||||
print("new json_match 2:", json_match)
|
||||
|
||||
if not json_match:
|
||||
print("MLLM response does not contain any JSON content. Returning None.")
|
||||
return {
|
||||
"assessment": None,
|
||||
"judgment": None,
|
||||
|
|
@ -440,18 +566,53 @@ def parse_mllm_standard_response(mllm_response):
|
|||
json_match.group(1) if "```json" in mllm_response else json_match.group(0)
|
||||
)
|
||||
|
||||
print("Extracted JSON string from MLLM response:", json_str)
|
||||
json_str = json_str.replace("\\'", "'")
|
||||
# print("Extracted JSON string from MLLM response:", json_str)
|
||||
|
||||
# Parse the JSON string
|
||||
parsed_data = json.loads(json_str)
|
||||
# print("Parsed MLLM response data:", parsed_data, type(parsed_data))
|
||||
|
||||
# Create a structured output with the key attributes
|
||||
result = {
|
||||
"assessment": parsed_data.get("Assessment", ""),
|
||||
"judgment": parsed_data.get("Judgment", ""),
|
||||
"evaluation_result": parsed_data.get("EvaluationResult", ""),
|
||||
}
|
||||
|
||||
if isinstance(parsed_data, dict):
|
||||
try:
|
||||
# Create a structured output with the key attributes
|
||||
result = {
|
||||
"assessment": parsed_data.get("Assessment", ""),
|
||||
"judgment": parsed_data.get("Judgment", ""),
|
||||
"evaluation_result": parsed_data.get("EvaluationResult", ""),
|
||||
}
|
||||
if extra_fields:
|
||||
for field in extra_fields:
|
||||
result[field] = parsed_data.get(field, "")
|
||||
except Exception as e:
|
||||
print(f"Error extracting fields from MLLM response: {e}")
|
||||
result = {
|
||||
"assessment": None,
|
||||
"judgment": None,
|
||||
"evaluation_result": None,
|
||||
}
|
||||
elif isinstance(
|
||||
parsed_data, list
|
||||
): # in this case we have multiple segments evaluated in the same response, so we return an array of results, one for each segment
|
||||
result = []
|
||||
for item in parsed_data:
|
||||
try:
|
||||
item_result = {
|
||||
"assessment": item.get("Assessment", ""),
|
||||
"judgment": item.get("Judgment", ""),
|
||||
"evaluation_result": item.get("EvaluationResult", ""),
|
||||
}
|
||||
if extra_fields:
|
||||
for field in extra_fields:
|
||||
item_result[field] = item.get(field, "")
|
||||
except Exception as e:
|
||||
print(f"Error extracting fields from MLLM response item: {e}")
|
||||
item_result = {
|
||||
"assessment": None,
|
||||
"judgment": None,
|
||||
"evaluation_result": None,
|
||||
}
|
||||
result.append(item_result)
|
||||
return result
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
|
|
|
|||
|
|
@ -90,6 +90,8 @@ def disclaim_bool_string(value):
|
|||
if isinstance(value, str):
|
||||
if value == "True":
|
||||
return True
|
||||
elif value == "Both":
|
||||
return "Both"
|
||||
else:
|
||||
return False
|
||||
elif isinstance(value, bool):
|
||||
|
|
|
|||
|
|
@ -5,10 +5,13 @@ from pydantic import BaseModel
|
|||
import json
|
||||
from typing import Dict, List
|
||||
from datetime import datetime, timezone
|
||||
import aiofiles
|
||||
import asyncio
|
||||
|
||||
from dependences.utils import (
|
||||
disclaim_bool_string,
|
||||
prepare_output_folder,
|
||||
prepare_folder_path,
|
||||
create_folder,
|
||||
db_persistence_insert,
|
||||
)
|
||||
|
|
@ -55,30 +58,27 @@ class WCAGAltTextValuationRoutes:
|
|||
try:
|
||||
print("Received wcag alttext validation request.")
|
||||
json_content = json.loads(data.model_dump_json())
|
||||
mllm_model_id = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
if self.mllm_settings["openai_model"] == "Both":
|
||||
|
||||
mllm_model_id_for_logging = (
|
||||
self.mllm_settings["mllm_model_id"]["model_id_remote"]
|
||||
+ "&"
|
||||
+ self.mllm_settings["mllm_model_id"]["model_id_local"]
|
||||
)
|
||||
else:
|
||||
mllm_model_id_for_logging = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
# prepare output folders if needed---
|
||||
images_output_dir = ""
|
||||
if (
|
||||
disclaim_bool_string(json_content["save_elaboration"]) == True
|
||||
or disclaim_bool_string(json_content["save_images"]) == True
|
||||
): # if something to save
|
||||
url_path = (
|
||||
json_content["page_url"]
|
||||
.replace(":", "")
|
||||
.replace("//", "_")
|
||||
.replace("/", "_")
|
||||
.replace("%2", "_")
|
||||
.replace("?", "_")
|
||||
.replace("=", "_")
|
||||
.replace("&", "_")
|
||||
)
|
||||
url_path=url_path[:50] # limit length
|
||||
now = datetime.now(timezone.utc)
|
||||
now_str = now.strftime("%Y_%m_%d-%H_%M_%S")
|
||||
folder_str = mllm_model_id.replace(":", "-") + "_" + now_str
|
||||
output_dir = prepare_output_folder(url_path, folder_str)
|
||||
|
||||
url_path, folder_str = prepare_folder_path(
|
||||
json_content, mllm_model_id_for_logging, tecnhnique_name="g94"
|
||||
)
|
||||
output_dir = prepare_output_folder(url_path, folder_str)
|
||||
if disclaim_bool_string(json_content["save_images"]) == True:
|
||||
images_output_dir = create_folder(
|
||||
output_dir, directory_separator="/", next_path="images"
|
||||
|
|
@ -99,32 +99,86 @@ class WCAGAltTextValuationRoutes:
|
|||
# Extract images
|
||||
logging.info(f"Extracting images from: {json_content['page_url']}")
|
||||
images = await image_extractor.extract_images(
|
||||
specific_images_urls=json_content["specific_images_urls"],extract_context=True
|
||||
specific_images_urls=json_content["specific_images_urls"],
|
||||
extract_context=True,
|
||||
)
|
||||
# MLLM settings
|
||||
mllm_end_point = self.mllm_settings["mllm_end_point"]
|
||||
mllm_api_key = self.mllm_settings["mllm_api_key"]
|
||||
|
||||
logging.info("mllm_end_point:%s", mllm_end_point)
|
||||
logging.info("mllm_model_id:%s", mllm_model_id)
|
||||
|
||||
# Create MLLM manager
|
||||
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key, mllm_model_id)
|
||||
logging.info("mllm_manager.end_point:%s", mllm_manager.end_point)
|
||||
# Make alt text evaluation
|
||||
mllm_responses = mllm_manager.make_alt_text_evaluation(
|
||||
images,
|
||||
openai_model=self.mllm_settings["openai_model"],
|
||||
)
|
||||
# Parse MLLM responses
|
||||
for i, response in enumerate(mllm_responses):
|
||||
parsed_resp = parse_mllm_alt_text_response(response["mllm_response"])
|
||||
mllm_responses[i]["mllm_response"] = parsed_resp
|
||||
if self.mllm_settings["openai_model"] == "Both":
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_alttext_assessments": mllm_responses,
|
||||
}
|
||||
def run_model_evaluation(endpoint, api_key, model_id, openai_model, label):
|
||||
manager = MLLMManager(endpoint, api_key, model_id)
|
||||
print(f"Using {label} model for alt text evaluation.", manager.end_point)
|
||||
logging.info("mllm_end_point:%s", endpoint)
|
||||
logging.info("mllm_model_id:%s", model_id)
|
||||
|
||||
responses = manager.make_alt_text_evaluation(images, openai_model=openai_model)
|
||||
|
||||
for i, response in enumerate(responses):
|
||||
responses[i]["mllm_response"] = parse_mllm_alt_text_response(response["mllm_response"])
|
||||
|
||||
return responses
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
future_openai = executor.submit(
|
||||
run_model_evaluation,
|
||||
self.mllm_settings["mllm_end_point"]["model_end_point_remote"],
|
||||
self.mllm_settings["mllm_api_key"]["api_key_remote"],
|
||||
self.mllm_settings["mllm_model_id"]["model_id_remote"],
|
||||
True, "first remote"
|
||||
)
|
||||
future_local = executor.submit(
|
||||
run_model_evaluation,
|
||||
self.mllm_settings["mllm_end_point"]["model_end_point_local"],
|
||||
self.mllm_settings["mllm_api_key"]["api_key_local"],
|
||||
self.mllm_settings["mllm_model_id"]["model_id_local"],
|
||||
False, "second local"
|
||||
)
|
||||
|
||||
mllm_responses_openai = future_openai.result()
|
||||
mllm_responses_local = future_local.result()
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_alttext_assessments": {
|
||||
"mllm_alttext_assessments_openai": mllm_responses_openai,
|
||||
"mllm_alttext_assessments_local": mllm_responses_local,
|
||||
}
|
||||
}
|
||||
else:
|
||||
|
||||
# MLLM settings
|
||||
mllm_end_point = self.mllm_settings["mllm_end_point"]
|
||||
mllm_api_key = self.mllm_settings["mllm_api_key"]
|
||||
mllm_model_id = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
# Create MLLM manager
|
||||
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key, mllm_model_id)
|
||||
print(
|
||||
"Using single model for alt text evaluation.",
|
||||
mllm_manager.end_point,
|
||||
)
|
||||
|
||||
logging.info("mllm_end_point:%s", mllm_end_point)
|
||||
logging.info("mllm_model_id:%s", mllm_model_id)
|
||||
# Make alt text evaluation
|
||||
mllm_responses = mllm_manager.make_alt_text_evaluation(
|
||||
images,
|
||||
openai_model=self.mllm_settings["openai_model"],
|
||||
)
|
||||
# Parse MLLM responses
|
||||
for i, response in enumerate(mllm_responses):
|
||||
parsed_resp = parse_mllm_alt_text_response(
|
||||
response["mllm_response"]
|
||||
)
|
||||
mllm_responses[i]["mllm_response"] = parsed_resp
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_alttext_assessments": mllm_responses,
|
||||
}
|
||||
|
||||
# common: prepare the object to return in the response
|
||||
returned_object = {
|
||||
"images": images,
|
||||
"mllm_validations": mllm_responses_object,
|
||||
|
|
@ -139,7 +193,7 @@ class WCAGAltTextValuationRoutes:
|
|||
connection_db=self.connection_db,
|
||||
insert_type="wcag_alttext_validation",
|
||||
page_url=json_content["page_url"],
|
||||
llm_model=mllm_model_id,
|
||||
llm_model=mllm_model_id_for_logging,
|
||||
json_in_str=json_in_str,
|
||||
json_out_str=json_out_str,
|
||||
table="wcag_validator_results",
|
||||
|
|
@ -152,15 +206,25 @@ class WCAGAltTextValuationRoutes:
|
|||
disclaim_bool_string(json_content["save_elaboration"]) == True
|
||||
): # Optionally save to JSON
|
||||
|
||||
await image_extractor.save_elaboration(
|
||||
await image_extractor.save_elaboration( # save also extracted images info into a dedicated json file
|
||||
images, output_dir=output_dir + "/extracted_images.json"
|
||||
)
|
||||
|
||||
# save mllm responses
|
||||
"""
|
||||
with open(
|
||||
output_dir + "/mllm_alttext_assessments.json", "w", encoding="utf-8"
|
||||
) as f:
|
||||
json.dump(mllm_responses, f, indent=2, ensure_ascii=False)
|
||||
#json.dump(mllm_responses, f, indent=2, ensure_ascii=False) #era questo nella vesrione del primo test utenti
|
||||
json.dump(mllm_responses_object, f, indent=2, ensure_ascii=False)"""
|
||||
|
||||
# async version
|
||||
async with aiofiles.open(
|
||||
output_dir + "/mllm_alttext_assessments.json", "w", encoding="utf-8"
|
||||
) as f:
|
||||
await f.write(
|
||||
json.dumps(mllm_responses_object, indent=2, ensure_ascii=False)
|
||||
)
|
||||
|
||||
return JSONResponse(content=returned_object, status_code=200)
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ unexpected_error_msg = "Unexpected Error: could not end the process"
|
|||
|
||||
class WCAG_g88Valuation(BaseModel):
|
||||
page_url: str = "https://www.bbc.com"
|
||||
|
||||
save_elaboration: str = "True"
|
||||
|
||||
|
||||
|
|
@ -52,16 +51,25 @@ class WCAG_g88ValuationRoutes:
|
|||
try:
|
||||
print("Received wcag G88 validation request.")
|
||||
json_content = json.loads(data.model_dump_json())
|
||||
mllm_model_id = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
if self.mllm_settings["openai_model"] == "Both":
|
||||
|
||||
mllm_model_id_for_logging = (
|
||||
self.mllm_settings["mllm_model_id"]["model_id_remote"]
|
||||
+ "&"
|
||||
+ self.mllm_settings["mllm_model_id"]["model_id_local"]
|
||||
)
|
||||
else:
|
||||
mllm_model_id_for_logging = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
# prepare output folders if needed---
|
||||
images_output_dir = ""
|
||||
|
||||
if (
|
||||
disclaim_bool_string(json_content["save_elaboration"]) == True
|
||||
): # if something to save
|
||||
|
||||
url_path, folder_str = prepare_folder_path(
|
||||
json_content, mllm_model_id, tecnhnique_name="h58"
|
||||
json_content, mllm_model_id_for_logging, tecnhnique_name="g88"
|
||||
)
|
||||
output_dir = prepare_output_folder(url_path, folder_str)
|
||||
|
||||
|
|
@ -76,26 +84,83 @@ class WCAG_g88ValuationRoutes:
|
|||
title_content = await title_content_extractor.extract_page_title()
|
||||
print("Extracted title_content.", title_content)
|
||||
|
||||
# MLLM settings
|
||||
mllm_end_point = self.mllm_settings["mllm_end_point"]
|
||||
mllm_api_key = self.mllm_settings["mllm_api_key"]
|
||||
if self.mllm_settings["openai_model"] == "Both":
|
||||
|
||||
logging.info("mllm_end_point:%s", mllm_end_point)
|
||||
logging.info("mllm_model_id:%s", mllm_model_id)
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# Create MLLM manager
|
||||
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key, mllm_model_id)
|
||||
logging.info("mllm_manager.end_point:%s", mllm_manager.end_point)
|
||||
# Make h88 evaluation
|
||||
mllm_responses = mllm_manager.make_g88_evaluation(
|
||||
title_content=title_content,
|
||||
openai_model=self.mllm_settings["openai_model"],
|
||||
)
|
||||
parsed_mllm_responses = parse_mllm_standard_response(
|
||||
mllm_responses["mllm_response"]
|
||||
)
|
||||
mllm_responses_object = {"mllm_g88_assessments": parsed_mllm_responses}
|
||||
def run_model_evaluation(
|
||||
endpoint, api_key, model_id, openai_model, label
|
||||
):
|
||||
manager = MLLMManager(endpoint, api_key, model_id)
|
||||
print(
|
||||
f"Using {label} model for title evaluation.", manager.end_point
|
||||
)
|
||||
logging.info("mllm_end_point:%s", endpoint)
|
||||
logging.info("mllm_model_id:%s", model_id)
|
||||
|
||||
responses = manager.make_g88_evaluation(
|
||||
title_content, openai_model=openai_model
|
||||
)
|
||||
|
||||
parsed_mllm_responses = parse_mllm_standard_response(
|
||||
responses["mllm_response"]
|
||||
)
|
||||
return parsed_mllm_responses
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
future_openai = executor.submit(
|
||||
run_model_evaluation,
|
||||
self.mllm_settings["mllm_end_point"]["model_end_point_remote"],
|
||||
self.mllm_settings["mllm_api_key"]["api_key_remote"],
|
||||
self.mllm_settings["mllm_model_id"]["model_id_remote"],
|
||||
True,
|
||||
"first remote",
|
||||
)
|
||||
future_local = executor.submit(
|
||||
run_model_evaluation,
|
||||
self.mllm_settings["mllm_end_point"]["model_end_point_local"],
|
||||
self.mllm_settings["mllm_api_key"]["api_key_local"],
|
||||
self.mllm_settings["mllm_model_id"]["model_id_local"],
|
||||
False,
|
||||
"second local",
|
||||
)
|
||||
|
||||
mllm_responses_openai = future_openai.result()
|
||||
mllm_responses_local = future_local.result()
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_g88_assessments": {
|
||||
"mllm_g88_assessments_openai": mllm_responses_openai,
|
||||
"mllm_g88_assessments_local": mllm_responses_local,
|
||||
}
|
||||
}
|
||||
|
||||
else:
|
||||
# MLLM settings
|
||||
mllm_end_point = self.mllm_settings["mllm_end_point"]
|
||||
mllm_api_key = self.mllm_settings["mllm_api_key"]
|
||||
mllm_model_id = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
logging.info("mllm_end_point:%s", mllm_end_point)
|
||||
logging.info("mllm_model_id:%s", mllm_model_id)
|
||||
|
||||
# Create MLLM manager
|
||||
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key, mllm_model_id)
|
||||
print(
|
||||
"Using single model for g88 evaluation.",
|
||||
mllm_manager.end_point,
|
||||
)
|
||||
# Make g88 evaluation
|
||||
mllm_responses = mllm_manager.make_g88_evaluation(
|
||||
title_content=title_content,
|
||||
openai_model=self.mllm_settings["openai_model"],
|
||||
)
|
||||
parsed_mllm_responses = parse_mllm_standard_response(
|
||||
mllm_responses["mllm_response"]
|
||||
)
|
||||
mllm_responses_object = {"mllm_g88_assessments": parsed_mllm_responses}
|
||||
|
||||
# common: prepare the object to return in the response
|
||||
returned_object = {
|
||||
"title_content": title_content,
|
||||
"mllm_validations": mllm_responses_object,
|
||||
|
|
@ -111,7 +176,7 @@ class WCAG_g88ValuationRoutes:
|
|||
connection_db=self.connection_db,
|
||||
insert_type="wcag_g88_validation",
|
||||
page_url=json_content["page_url"],
|
||||
llm_model=mllm_model_id,
|
||||
llm_model=mllm_model_id_for_logging,
|
||||
json_in_str=json_in_str,
|
||||
json_out_str=json_out_str,
|
||||
table="wcag_validator_results",
|
||||
|
|
|
|||
|
|
@ -3,6 +3,8 @@ from fastapi.responses import JSONResponse
|
|||
import logging
|
||||
from pydantic import BaseModel
|
||||
import json
|
||||
import aiofiles
|
||||
import asyncio
|
||||
|
||||
|
||||
from dependences.utils import (
|
||||
|
|
@ -13,7 +15,7 @@ from dependences.utils import (
|
|||
db_persistence_insert,
|
||||
)
|
||||
from dependences.language_extractor import LanguageExtractor
|
||||
from dependences.mllm_management import MLLMManager, parse_mllm_alt_text_response
|
||||
from dependences.mllm_management import MLLMManager, parse_mllm_standard_response
|
||||
|
||||
invalid_json_input_msg = "Invalid JSON format"
|
||||
unexpected_error_msg = "Unexpected Error: could not end the process"
|
||||
|
|
@ -21,12 +23,10 @@ unexpected_error_msg = "Unexpected Error: could not end the process"
|
|||
|
||||
class WCAG_h58Valuation(BaseModel):
|
||||
page_url: str = "https://www.bbc.com"
|
||||
#context_levels: int = 5
|
||||
#pixel_distance_threshold: int = 200
|
||||
#number_of_images: int = 10
|
||||
#save_images: str = "True"
|
||||
number_of_segments: int = 10
|
||||
save_elaboration: str = "True"
|
||||
#specific_images_urls: List[str] = []
|
||||
short_segments_length_threshold: int = 30
|
||||
max_total_length: int = 15000
|
||||
|
||||
|
||||
class WCAG_h58ValuationRoutes:
|
||||
|
|
@ -55,92 +55,187 @@ class WCAG_h58ValuationRoutes:
|
|||
try:
|
||||
print("Received wcag H58 validation request.")
|
||||
json_content = json.loads(data.model_dump_json())
|
||||
mllm_model_id = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
if self.mllm_settings["openai_model"] == "Both":
|
||||
|
||||
mllm_model_id_for_logging = (
|
||||
self.mllm_settings["mllm_model_id"]["model_id_remote"]
|
||||
+ "&"
|
||||
+ self.mllm_settings["mllm_model_id"]["model_id_local"]
|
||||
)
|
||||
else:
|
||||
mllm_model_id_for_logging = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
# prepare output folders if needed---
|
||||
if (
|
||||
disclaim_bool_string(json_content["save_elaboration"]) == True
|
||||
|
||||
): # if something to save
|
||||
|
||||
url_path,folder_str=prepare_folder_path(json_content, mllm_model_id,tecnhnique_name="h58")
|
||||
|
||||
url_path, folder_str = prepare_folder_path(
|
||||
json_content, mllm_model_id_for_logging, tecnhnique_name="h58"
|
||||
)
|
||||
output_dir = prepare_output_folder(url_path, folder_str)
|
||||
|
||||
|
||||
# Create lang extractor
|
||||
language_extractor = LanguageExtractor(
|
||||
json_content["page_url"],
|
||||
|
||||
url=json_content["page_url"],
|
||||
short_segments_length_threshold=json_content[
|
||||
"short_segments_length_threshold"
|
||||
],
|
||||
max_total_length=json_content["max_total_length"],
|
||||
)
|
||||
# Extract images
|
||||
# Extract languages
|
||||
logging.info(f"Extracting languages from: {json_content['page_url']}")
|
||||
languages = await language_extractor.extract_content_with_lang_context()
|
||||
print("Extracted languages and textual elements.", languages)
|
||||
main_language="italian"
|
||||
other_textual_elements="ciao casa"
|
||||
|
||||
# MLLM settings
|
||||
mllm_end_point = self.mllm_settings["mllm_end_point"]
|
||||
mllm_api_key = self.mllm_settings["mllm_api_key"]
|
||||
|
||||
logging.info("mllm_end_point:%s", mllm_end_point)
|
||||
logging.info("mllm_model_id:%s", mllm_model_id)
|
||||
if self.mllm_settings["openai_model"] == "Both":
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
# Create MLLM manager
|
||||
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key, mllm_model_id)
|
||||
logging.info("mllm_manager.end_point:%s", mllm_manager.end_point)
|
||||
# Make h58 evaluation
|
||||
mllm_responses = mllm_manager.make_h58_evaluation(
|
||||
main_language,
|
||||
other_textual_elements,
|
||||
openai_model=self.mllm_settings["openai_model"],
|
||||
)
|
||||
# Parse MLLM responses
|
||||
for i, response in enumerate(mllm_responses):
|
||||
parsed_resp = response["mllm_response"]#parse_mllm_alt_text_response(response["mllm_response"])
|
||||
mllm_responses[i]["mllm_response"] = parsed_resp
|
||||
def run_model_evaluation(
|
||||
endpoint, api_key, model_id, openai_model, label
|
||||
):
|
||||
manager = MLLMManager(endpoint, api_key, model_id)
|
||||
print(
|
||||
f"Using {label} model for title evaluation.", manager.end_point
|
||||
)
|
||||
logging.info("mllm_end_point:%s", endpoint)
|
||||
logging.info("mllm_model_id:%s", model_id)
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_h58_assessments": mllm_responses,
|
||||
}
|
||||
responses = manager.make_h58_evaluation(
|
||||
languages=languages,
|
||||
number_of_segments=json_content["number_of_segments"],
|
||||
openai_model=openai_model
|
||||
)
|
||||
|
||||
mllm_respones_flattened = []
|
||||
for i, response in enumerate(responses):
|
||||
# print("response['mllm_response']:", response["mllm_response"])
|
||||
|
||||
# because the response is a list of assessments for each segment, we need to parse each of them and flatten the result in a single list of assessments
|
||||
parsed_resp = parse_mllm_standard_response(
|
||||
response["mllm_response"],
|
||||
extra_fields=[
|
||||
"tag",
|
||||
"html",
|
||||
"detected_lang",
|
||||
"declared_lang",
|
||||
],
|
||||
)
|
||||
|
||||
mllm_respones_flattened.extend(parsed_resp)
|
||||
|
||||
return mllm_respones_flattened
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
future_openai = executor.submit(
|
||||
run_model_evaluation,
|
||||
self.mllm_settings["mllm_end_point"]["model_end_point_remote"],
|
||||
self.mllm_settings["mllm_api_key"]["api_key_remote"],
|
||||
self.mllm_settings["mllm_model_id"]["model_id_remote"],
|
||||
True,
|
||||
"first remote",
|
||||
)
|
||||
future_local = executor.submit(
|
||||
run_model_evaluation,
|
||||
self.mllm_settings["mllm_end_point"]["model_end_point_local"],
|
||||
self.mllm_settings["mllm_api_key"]["api_key_local"],
|
||||
self.mllm_settings["mllm_model_id"]["model_id_local"],
|
||||
False,
|
||||
"second local",
|
||||
)
|
||||
|
||||
mllm_responses_openai = future_openai.result()
|
||||
mllm_responses_local = future_local.result()
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_h58_assessments": {
|
||||
"mllm_h58_assessments_openai": mllm_responses_openai,
|
||||
"mllm_h58_assessments_local": mllm_responses_local,
|
||||
}
|
||||
}
|
||||
|
||||
else:
|
||||
# MLLM settings
|
||||
mllm_end_point = self.mllm_settings["mllm_end_point"]
|
||||
mllm_api_key = self.mllm_settings["mllm_api_key"]
|
||||
mllm_model_id = self.mllm_settings["mllm_model_id"]
|
||||
|
||||
logging.info("mllm_end_point:%s", mllm_end_point)
|
||||
logging.info("mllm_model_id:%s", mllm_model_id)
|
||||
|
||||
# Create MLLM manager
|
||||
mllm_manager = MLLMManager(mllm_end_point, mllm_api_key, mllm_model_id)
|
||||
print(
|
||||
"Using single model for h58 evaluation.",
|
||||
mllm_manager.end_point,
|
||||
)
|
||||
# Make h58 evaluation
|
||||
mllm_responses = mllm_manager.make_h58_evaluation(
|
||||
languages=languages,
|
||||
number_of_segments=json_content["number_of_segments"],
|
||||
openai_model=self.mllm_settings["openai_model"],
|
||||
)
|
||||
# Parse MLLM responses
|
||||
# print("Raw MLLM responses:", mllm_responses)
|
||||
mllm_respones_flattened = []
|
||||
for i, response in enumerate(mllm_responses):
|
||||
# print("response['mllm_response']:", response["mllm_response"])
|
||||
|
||||
# because the response is a list of assessments for each segment, we need to parse each of them and flatten the result in a single list of assessments
|
||||
parsed_resp = parse_mllm_standard_response(
|
||||
response["mllm_response"],
|
||||
extra_fields=["tag", "html", "detected_lang", "declared_lang"],
|
||||
)
|
||||
|
||||
mllm_respones_flattened.extend(parsed_resp)
|
||||
|
||||
mllm_responses_object = {
|
||||
"mllm_h58_assessments": mllm_respones_flattened,
|
||||
}
|
||||
|
||||
# common: prepare the object to return in the response
|
||||
returned_object = {
|
||||
|
||||
"languages": languages,
|
||||
"mllm_validations": mllm_responses_object,
|
||||
}
|
||||
|
||||
"""
|
||||
try:
|
||||
# Persist to local db
|
||||
# Convert JSON data to string
|
||||
json_in_str = json.dumps(images, ensure_ascii=False)
|
||||
json_in_str = json.dumps(languages, ensure_ascii=False)
|
||||
json_out_str = json.dumps(mllm_responses_object, ensure_ascii=False)
|
||||
db_persistence_insert(
|
||||
connection_db=self.connection_db,
|
||||
insert_type="wcag_alttext_validation",
|
||||
insert_type="wcag_h58_validation",
|
||||
page_url=json_content["page_url"],
|
||||
llm_model=mllm_model_id,
|
||||
llm_model=mllm_model_id_for_logging,
|
||||
json_in_str=json_in_str,
|
||||
json_out_str=json_out_str,
|
||||
table="wcag_validator_results",
|
||||
)
|
||||
except Exception as e:
|
||||
logging.error("error persisting to local db: %s", e)
|
||||
"""
|
||||
# save extracted images info
|
||||
|
||||
if (
|
||||
disclaim_bool_string(json_content["save_elaboration"]) == True
|
||||
): # Optionally save to JSON
|
||||
|
||||
#await image_extractor.save_elaboration(
|
||||
# images, output_dir=output_dir + "/extracted_images.json"
|
||||
#)
|
||||
|
||||
# save mllm responses
|
||||
# save mllm input and responses
|
||||
"""
|
||||
with open(
|
||||
output_dir + "/mllm_assessments.json", "w", encoding="utf-8"
|
||||
) as f:
|
||||
json.dump(mllm_responses, f, indent=2, ensure_ascii=False)
|
||||
|
||||
json.dump(returned_object, f, indent=2, ensure_ascii=False)"""
|
||||
|
||||
# async version
|
||||
async with aiofiles.open(
|
||||
output_dir + "/mllm_assessments.json", "w", encoding="utf-8"
|
||||
) as f:
|
||||
await f.write(
|
||||
json.dumps(returned_object, indent=2, ensure_ascii=False)
|
||||
)
|
||||
|
||||
return JSONResponse(content=returned_object, status_code=200)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
|
|
|
|||
|
|
@ -74,17 +74,26 @@ def app_startup():
|
|||
connection_db = db_persistence_startup(table="wcag_validator_results")
|
||||
if disclaim_bool_string(return_from_env_valid("USE_OPENAI_MODEL", "False")) == True:
|
||||
openai_model = True
|
||||
elif disclaim_bool_string(return_from_env_valid("USE_OPENAI_MODEL", "False")) == False:
|
||||
openai_model = False
|
||||
elif disclaim_bool_string(return_from_env_valid("USE_OPENAI_MODEL", "False")) == "Both":
|
||||
openai_model = "Both"
|
||||
else:
|
||||
openai_model = False
|
||||
|
||||
if openai_model:
|
||||
print("openai_model:", openai_model)
|
||||
if openai_model== True:
|
||||
mllm_end_point = return_from_env_valid("MLLM_END_POINT_OPENAI", "")
|
||||
mllm_api_key = return_from_env_valid("MLLM_API_KEY_OPENAI", "")
|
||||
mllm_model_id = return_from_env_valid("MLLM_MODEL_ID_OPENAI", "")
|
||||
|
||||
elif openai_model == "Both":
|
||||
mllm_end_point = {"model_end_point_remote":return_from_env_valid("MLLM_END_POINT_OPENAI",""), "model_end_point_local":return_from_env_valid("MLLM_END_POINT_LOCAL", "")}
|
||||
mllm_api_key = {"api_key_remote":return_from_env_valid("MLLM_API_KEY_OPENAI",""), "api_key_local":return_from_env_valid("MLLM_API_KEY_LOCAL", "")}
|
||||
mllm_model_id = {"model_id_remote":return_from_env_valid("MLLM_MODEL_ID_OPENAI","") ,"model_id_local":return_from_env_valid("MLLM_MODEL_ID_LOCAL", "")}
|
||||
else:
|
||||
mllm_end_point = return_from_env_valid("MLLM_END_POINT_LOCAL", "")
|
||||
mllm_api_key = return_from_env_valid("MLLM_API_KEY_LOCAL", "")
|
||||
mllm_model_id = return_from_env_valid("MLLM_MODEL_ID_LOCAL", "")
|
||||
mllm_end_point = return_from_env_valid("MLLM_END_POINT_LOCAL","")
|
||||
mllm_api_key = return_from_env_valid("MLLM_API_KEY_LOCAL","")
|
||||
mllm_model_id = return_from_env_valid("MLLM_MODEL_ID_LOCAL","")
|
||||
|
||||
print("mllm_end_point:", mllm_end_point)
|
||||
print("mllm_model_id:", mllm_model_id)
|
||||
|
|
|
|||
Loading…
Reference in New Issue