wcag_AI_validation/dependences/utils.py

252 lines
6.8 KiB
Python

import json
import time
import urllib.request
import urllib.parse
import logging
import os
import requests
import base64
import sqlite3
from PIL import Image
import io
from datetime import datetime, timezone
exception_msg = "Exception: %s"
def call_API_urlibrequest(
data={},
verbose=False,
url="",
headers=[],
method="post",
base=2, # number of seconds to wait
max_tries=3,
):
if verbose:
logging.info("input_data:%s", data)
# Allow multiple attempts to call the API incase of downtime.
# Return provided response to user after 3 failed attempts.
wait_seconds = [base**i for i in range(max_tries)]
for num_tries in range(max_tries):
try:
if method == "get":
# Encode the parameters and append them to the URL
query_string = urllib.parse.urlencode(data)
url_with_params = f"{url}?{query_string}"
request = urllib.request.Request(url_with_params, method="GET")
for ele in headers:
request.add_header(ele[0], ele[1])
elif method == "post":
# Convert the dictionary to a JSON formatted string and encode it to bytes
data_to_send = json.dumps(data).encode("utf-8")
request = urllib.request.Request(url, data=data_to_send, method="POST")
for ele in headers:
request.add_header(ele[0], ele[1])
else:
return {"error_message": "method_not_allowed"}
# Send the request and capture the response
with urllib.request.urlopen(request) as response:
# Read and decode the response
response_json = json.loads(response.read().decode("utf-8"))
logging.info("response_json:%s", response_json)
logging.info("response.status_code:%s", response.getcode())
return response_json
except Exception as e:
logging.error("error message:%s", e)
response_json = {"error": e}
logging.info("num_tries:%s", num_tries)
logging.info(
"Waiting %s seconds before automatically trying again.",
str(wait_seconds[num_tries]),
)
time.sleep(wait_seconds[num_tries])
logging.info(
"Tried %s times to make API call to get a valid response object", max_tries
)
logging.info("Returning provided response")
return response_json
def disclaim_bool_string(value):
if isinstance(value, str):
if value == "True":
return True
else:
return False
elif isinstance(value, bool):
return value
def prepare_output_folder(file, now_str):
output_dir = ""
try:
output_dir = create_folder(
root_path=os.getcwd(),
directory_separator="/",
next_path="outputs",
)
output_dir = create_folder(
root_path=output_dir,
directory_separator="/",
next_path=file + "_" + now_str,
)
except Exception as e:
logging.error("error prepare output folder:%s", e)
return output_dir
def prepare_folder_path(json_content, mllm_model_id, tecnhnique_name=""):
url_path = (
json_content["page_url"]
.replace(":", "")
.replace("//", "_")
.replace("/", "_")
.replace("%2", "_")
.replace("?", "_")
.replace("=", "_")
.replace("&", "_")
)
url_path = url_path[:50] # limit length
now = datetime.now(timezone.utc)
now_str = now.strftime("%Y_%m_%d-%H_%M_%S")
folder_str = mllm_model_id.replace(":", "-") + "_" + tecnhnique_name + "_" + now_str
return url_path, folder_str
def create_folder(root_path, directory_separator, next_path):
output_dir = root_path + directory_separator + next_path
try:
if not os.path.exists(output_dir):
os.mkdir(output_dir)
except Exception as e:
logging.error(exception_msg, e)
exit(1)
return output_dir
def encode_image_from_url(image_url):
response = requests.get(image_url)
# Open image and convert to RGB
image = Image.open(io.BytesIO(response.content))
# Convert to RGB (handles RGBA, grayscale, etc.)
if image.mode != "RGB":
image = image.convert("RGB")
# Save to bytes buffer
buffer = io.BytesIO()
image.save(buffer, format="PNG") # or 'JPEG'
buffer.seek(0)
# Encode to base64
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def db_persistence_startup(
db_name_and_path="persistence/wcag_validator.db",
table="wcag_validator_results",
):
try:
_ = create_folder(
root_path=os.getcwd(),
directory_separator="/",
next_path="persistence",
)
except Exception as e:
logging.error("exception on db persistence startup:%s", e)
exit(1)
try:
db_connection = sqlite3.connect(db_name_and_path)
cursor = db_connection.cursor()
# Create a table to store JSON data
cursor.execute(
"""CREATE TABLE IF NOT EXISTS """
+ table
+ """ (
id INTEGER PRIMARY KEY AUTOINCREMENT,
insertion_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
insert_type TEXT,
page_url TEXT,
user TEXT,
llm_model TEXT,
json_input_data TEXT, json_output_data TEXT
)"""
)
db_connection.commit()
logging.info("connection to the database established")
return db_connection
except Exception as e:
logging.error("db_management problem:%s", e)
exit(1)
def db_persistence_insert(
connection_db,
insert_type,
page_url,
user="",
llm_model="",
json_in_str="",
json_out_str="",
table="wcag_validator_results",
):
try:
cursor = connection_db.cursor()
# Insert JSON data into the table along with the current timestamp
cursor.execute(
"INSERT INTO "
+ table
+ " (insert_type,page_url,user,llm_model,json_input_data,json_output_data) VALUES (?,?,?,?,?,?)",
(insert_type, page_url, user, llm_model, json_in_str, json_out_str),
)
connection_db.commit()
logging.info(
"Data correctly saved on local db table:%s, insertion type:%s",
table,
insert_type,
)
except Exception as e:
logging.error("exception" + " %s", e)
def return_from_env_valid(env_val, default_val):
env_val = env_val.upper() # to align with uppercase convention in env files
val = os.getenv(env_val, default_val)
return val