541 lines
17 KiB
Python
541 lines
17 KiB
Python
# to launch: python build_dataset_from_folder.py --ref_path "" --push_to_hub --repo_id "nicolaleo/LLM-alt-text-assessment" --token ""
|
|
|
|
from datasets import Dataset, DatasetDict
|
|
import datasets
|
|
import json
|
|
from pathlib import Path
|
|
from PIL import Image
|
|
import hashlib
|
|
import urllib.parse
|
|
import argparse
|
|
|
|
|
|
'''
|
|
# Dataset metadata
|
|
_DESCRIPTION = """\
|
|
Dataset for image alt-text assessment and improvement using MLLM responses.
|
|
Contains images, original alt-texts, quality assessments, and improved versions.
|
|
"""
|
|
|
|
_CITATION = """\
|
|
@misc{alt_text_assessment,
|
|
title={Alt-Text Assessment Dataset},
|
|
year={2024}
|
|
}
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
class AltTextDataset(datasets.GeneratorBasedBuilder):
|
|
"""Dataset for alt-text assessment with images and MLLM responses."""
|
|
|
|
VERSION = datasets.Version("1.0.0")
|
|
|
|
def _info(self):
|
|
return datasets.DatasetInfo(
|
|
description=_DESCRIPTION,
|
|
features=datasets.Features({
|
|
"image": datasets.Image(),
|
|
"image_url": datasets.Value("string"),
|
|
"alt_text": datasets.Value("string"),
|
|
"original_alt_text_assessment": datasets.Value("string"),
|
|
"assessment": datasets.Value("string"),
|
|
"evaluation_result": datasets.Value("string"),
|
|
"new_alt_text": datasets.Value("string"),
|
|
#"source_folder": datasets.Value("string"),
|
|
}),
|
|
citation=_CITATION,
|
|
)
|
|
|
|
def _split_generators(self, dl_manager):
|
|
"""Define data splits."""
|
|
return [
|
|
datasets.SplitGenerator(
|
|
name=datasets.Split.TRAIN,
|
|
gen_kwargs={
|
|
"json_filepath": "data.json",
|
|
"images_dir": "images"
|
|
},
|
|
),
|
|
]
|
|
|
|
def _generate_examples(self, json_filepath, images_dir):
|
|
"""Generate examples from JSON file and image directory."""
|
|
with open(json_filepath, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
images_path = Path(images_dir)
|
|
|
|
for idx, entry in enumerate(data):
|
|
image_url = entry["image_url"]
|
|
image_filename = url_to_filename(image_url)
|
|
image_path = images_path / image_filename
|
|
|
|
# Load image if exists, otherwise None
|
|
image = str(image_path) if image_path.exists() else None
|
|
|
|
yield idx, {
|
|
"image": image,
|
|
"image_url": image_url,
|
|
"alt_text": entry["alt_text"],
|
|
"original_alt_text_assessment": entry["mllm_response"]["original_alt_text_assessment"],
|
|
"assessment": entry["mllm_response"]["assessment"],
|
|
"evaluation_result": entry["mllm_response"]["evaluation_result"],
|
|
"new_alt_text": entry["mllm_response"]["new_alt_text"],
|
|
}
|
|
|
|
'''
|
|
# ============================================================================
|
|
# SIMPLE USAGE FUNCTIONS
|
|
# ============================================================================
|
|
|
|
|
|
def url_to_filename(image_url): # save step as in the image_extractor dependence
|
|
"""
|
|
Convert image URL to sanitized filename following your exact logic.
|
|
|
|
Args:
|
|
image_url: The image URL
|
|
|
|
Returns:
|
|
Sanitized filename with extension
|
|
"""
|
|
|
|
# Parse the URL to get the path without query parameters
|
|
parsed_url = urllib.parse.urlparse(image_url)
|
|
url_path = parsed_url.path
|
|
|
|
# Get the filename from the path
|
|
filename = url_path.split("/")[-1]
|
|
print(f"Original filename: '{filename}'")
|
|
|
|
# Split filename and extension
|
|
if "." in filename:
|
|
image_name, ext = filename.rsplit(".", 1)
|
|
ext = ext.lower()
|
|
else:
|
|
image_name = filename
|
|
ext = "jpg"
|
|
|
|
# Validate extension
|
|
if ext not in ["jpg", "jpeg", "png", "gif", "webp"]:
|
|
ext = "jpg"
|
|
|
|
# Sanitize image name (remove special characters, limit length)
|
|
image_name = "".join(c for c in image_name if c.isalnum() or c in ("-", "_"))
|
|
|
|
image_name = image_name[:50] # Limit filename length
|
|
|
|
# If name is empty after sanitization, create a hash-based name
|
|
if not image_name:
|
|
image_name = hashlib.md5(image_url.encode()).hexdigest()[:16]
|
|
|
|
return f"{image_name}.{ext}"
|
|
|
|
|
|
def push_to_hub_example(dataset_path="alt_text_merged_dataset", repo_id="",token=None):
|
|
"""
|
|
Example of how to push dataset to Hugging Face Hub.
|
|
You need to authenticate first!
|
|
"""
|
|
from huggingface_hub import login
|
|
|
|
print("\n=== Pushing Dataset to Hugging Face Hub ===")
|
|
# Method 1: Login interactively (will prompt for token)
|
|
# login()
|
|
|
|
# Method 2: Login with token directly
|
|
login(token=token)
|
|
|
|
# Method 3: Set token as environment variable
|
|
# export HF_TOKEN="hf_YourTokenHere"
|
|
# Then login() will use it automatically
|
|
|
|
# Load your dataset
|
|
ds = load_dataset_from_disk(dataset_path)
|
|
|
|
# Combine into DatasetDict
|
|
ds = DatasetDict(
|
|
{
|
|
"train": ds,
|
|
# #"test": test_dataset
|
|
}
|
|
)
|
|
|
|
# Push to hub (creates repo if it doesn't exist)
|
|
ds.push_to_hub( # Automatically converts to Parquet when uploading to Hub
|
|
repo_id, # Replace with your username
|
|
private=False, # Set True for private dataset
|
|
)
|
|
|
|
print("Dataset pushed successfully!")
|
|
print(f"View at: https://huggingface.co/datasets/{repo_id}")
|
|
|
|
|
|
def create_dataset_from_json(json_filepath, json_filepath_images, images_dir="images"):
|
|
"""
|
|
Create a Hugging Face Dataset from JSON file with local images.
|
|
|
|
Args:
|
|
json_filepath: Path to JSON file with your data structure
|
|
images_dir: Directory containing the images (default: "images")
|
|
|
|
Returns:
|
|
datasets.Dataset object with images loaded
|
|
"""
|
|
with open(json_filepath, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
with open(json_filepath_images, "r", encoding="utf-8") as f:
|
|
data_images = json.load(f)
|
|
|
|
images_path = Path(images_dir)
|
|
|
|
# Flatten the nested structure and load images
|
|
flattened_data = {
|
|
"image": [],
|
|
"image_url": [],
|
|
"alt_text": [],
|
|
"original_alt_text_assessment": [],
|
|
"assessment": [],
|
|
"evaluation_result": [],
|
|
"new_alt_text": [],
|
|
"page_url": [],
|
|
"html_context": [],
|
|
}
|
|
|
|
count_entry = 0
|
|
for entry in data:
|
|
if (
|
|
entry["mllm_response"]["original_alt_text_assessment"] is None
|
|
): # important! skip entries with no MLLM response. not usable data
|
|
print(
|
|
f"Skipping entry with image URL: {entry['image_url']} due to missing MLLM response"
|
|
)
|
|
count_entry += 1
|
|
continue # Skip entries with no MLLM response
|
|
image_url = entry["image_url"]
|
|
image_filename = url_to_filename(image_url)
|
|
image_path = images_path / image_filename
|
|
|
|
# Load image if it exists
|
|
if image_path.exists():
|
|
img = Image.open(image_path)
|
|
flattened_data["image"].append(img)
|
|
else:
|
|
print(f"Warning: Image not found: {image_path}")
|
|
flattened_data["image"].append(None)
|
|
|
|
flattened_data["image_url"].append(image_url)
|
|
flattened_data["alt_text"].append(entry["alt_text"])
|
|
flattened_data["original_alt_text_assessment"].append(
|
|
str(entry["mllm_response"]["original_alt_text_assessment"])
|
|
)
|
|
flattened_data["assessment"].append(entry["mllm_response"]["assessment"])
|
|
flattened_data["evaluation_result"].append(
|
|
entry["mllm_response"]["evaluation_result"]
|
|
)
|
|
flattened_data["new_alt_text"].append(entry["mllm_response"]["new_alt_text"])
|
|
flattened_data["page_url"].append(data_images[count_entry]["page_url"])
|
|
flattened_data["html_context"].append(data_images[count_entry]["html_context"])
|
|
|
|
count_entry += 1
|
|
|
|
print(f"Total valid entries loaded: {len(flattened_data['image_url'])}")
|
|
return datasets.Dataset.from_dict(flattened_data)
|
|
|
|
|
|
def create_dataset_from_folders(
|
|
ref_path,
|
|
json_filename="mllm_alttext_assessments.json",
|
|
json_filename_images="extracted_images.json",
|
|
images_dirname="images",
|
|
):
|
|
"""
|
|
Create a merged dataset from multiple folders under ref_path.
|
|
Each folder should contain a JSON file and an images subdirectory.
|
|
|
|
Args:
|
|
ref_path: Root path containing multiple folders
|
|
json_filename: Name of JSON file in each folder (default: "data.json")
|
|
images_dirname: Name of images subdirectory (default: "images")
|
|
|
|
Returns:
|
|
datasets.Dataset object with all entries merged
|
|
"""
|
|
ref_path = Path(ref_path)
|
|
all_datasets = []
|
|
|
|
# Find all subdirectories containing the JSON file
|
|
folders_processed = 0
|
|
|
|
for folder in ref_path.iterdir():
|
|
if not folder.is_dir():
|
|
continue
|
|
|
|
json_path = folder / json_filename
|
|
json_path_images = folder / json_filename_images
|
|
images_path = folder / images_dirname
|
|
|
|
# Check if both JSON and images directory exist
|
|
if not json_path.exists():
|
|
print(f"Skipping {folder.name}: no {json_filename} found")
|
|
continue
|
|
|
|
if not json_path_images.exists():
|
|
print(f"Skipping {folder.name}: no {json_filename_images} found")
|
|
continue
|
|
|
|
if not images_path.exists():
|
|
print(f"Warning: {folder.name}: images directory not found")
|
|
# continue
|
|
# Continue anyway, images might be optional (from urls only)
|
|
|
|
print(f"Processing folder: {folder.name}")
|
|
|
|
try:
|
|
# Create dataset for this folder
|
|
ds = create_dataset_from_json(
|
|
str(json_path), str(json_path_images), str(images_path)
|
|
)
|
|
all_datasets.append(ds)
|
|
|
|
folders_processed += 1
|
|
print(f" -> Loaded {len(ds)} entries")
|
|
except Exception as e:
|
|
print(f"Error processing {folder.name}: {e}")
|
|
continue
|
|
|
|
if not all_datasets:
|
|
raise ValueError(f"No valid folders found in {ref_path}")
|
|
|
|
# Merge all datasets
|
|
print(f"\n=== Merging {folders_processed} folders ===")
|
|
merged_dataset = datasets.concatenate_datasets(all_datasets)
|
|
print(f"Total entries: {len(merged_dataset)}")
|
|
|
|
return merged_dataset
|
|
|
|
|
|
def verify_images(json_filepath, images_dir="images"):
|
|
"""
|
|
Verify that all images referenced in JSON exist in the images directory.
|
|
|
|
Args:
|
|
json_filepath: Path to JSON file
|
|
images_dir: Directory containing images
|
|
|
|
Returns:
|
|
Dict with 'found', 'missing', and 'details' keys
|
|
"""
|
|
with open(json_filepath, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
|
|
images_path = Path(images_dir)
|
|
|
|
found = []
|
|
missing = []
|
|
|
|
for entry in data:
|
|
image_url = entry["image_url"]
|
|
image_filename = url_to_filename(image_url)
|
|
image_path = images_path / image_filename
|
|
print(
|
|
"image_url:",
|
|
image_url,
|
|
"image_filename:",
|
|
image_filename,
|
|
"image_path:",
|
|
image_path,
|
|
)
|
|
|
|
if image_path.exists():
|
|
found.append(
|
|
{"url": image_url, "filename": image_filename, "path": str(image_path)}
|
|
)
|
|
else:
|
|
missing.append(
|
|
{
|
|
"url": image_url,
|
|
"filename": image_filename,
|
|
"expected_path": str(image_path),
|
|
}
|
|
)
|
|
|
|
return {
|
|
"found": len(found),
|
|
"missing": len(missing),
|
|
"total": len(data),
|
|
"details": {"found_images": found, "missing_images": missing},
|
|
}
|
|
|
|
|
|
def verify_images_in_folders(
|
|
ref_path, json_filename="mllm_alttext_assessments.json", images_dirname="images"
|
|
):
|
|
"""
|
|
Verify images across all folders under ref_path.
|
|
|
|
Args:
|
|
ref_path: Root path containing multiple folders
|
|
json_filename: Name of JSON file in each folder
|
|
images_dirname: Name of images subdirectory
|
|
|
|
Returns:
|
|
Dict with aggregated verification results
|
|
"""
|
|
ref_path = Path(ref_path)
|
|
total_found = 0
|
|
total_missing = 0
|
|
total_entries = 0
|
|
folder_results = {}
|
|
|
|
for folder in ref_path.iterdir():
|
|
if not folder.is_dir():
|
|
continue
|
|
|
|
json_path = folder / json_filename
|
|
images_path = folder / images_dirname
|
|
|
|
if not json_path.exists():
|
|
continue
|
|
|
|
print(f"Verifying folder: {folder.name}")
|
|
|
|
try:
|
|
verification = verify_images(str(json_path), str(images_path))
|
|
folder_results[folder.name] = verification
|
|
|
|
total_found += verification["found"]
|
|
total_missing += verification["missing"]
|
|
total_entries += verification["total"]
|
|
|
|
print(f" Found: {verification['found']}/{verification['total']}")
|
|
|
|
except Exception as e:
|
|
print(f" Error: {e}")
|
|
continue
|
|
|
|
return {
|
|
"found": total_found,
|
|
"missing": total_missing,
|
|
"total": total_entries,
|
|
"folders": folder_results,
|
|
}
|
|
|
|
|
|
def save_dataset(dataset, output_path):
|
|
"""Save dataset in Arrow format (includes images)."""
|
|
dataset.save_to_disk(output_path)
|
|
# print(f"Dataset saved to {output_path}")
|
|
|
|
# Or save as JSON
|
|
# dataset.to_json(f"{output_path}/data.json")
|
|
|
|
# Or save as CSV
|
|
# dataset.to_csv(f"{output_path}/data.csv")
|
|
|
|
# Or save as Parquet
|
|
# dataset.to_parquet(f"{output_path}/data.parquet")
|
|
|
|
|
|
def load_dataset_from_disk(dataset_path):
|
|
"""Load a previously saved dataset."""
|
|
return datasets.load_from_disk(dataset_path)
|
|
|
|
|
|
# ============================================================================
|
|
# EXAMPLE USAGE
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"--ref_path",
|
|
type=str,
|
|
help=("Root path containing multiple folders"),
|
|
default="",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--push_to_hub",
|
|
action="store_true",
|
|
default=False,
|
|
help=("If True push the merged dataset to Hugging Face Hub"),
|
|
)
|
|
parser.add_argument(
|
|
"--token",
|
|
type=str,
|
|
help=("Hugging Face authentication token"),
|
|
default="",
|
|
)
|
|
parser.add_argument(
|
|
"--repo_id",
|
|
type=str,
|
|
help=("Hugging Face repository ID"),
|
|
default="nicolaleo/LLM-alt-text-assessment",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Example 1: Verify images across all folders
|
|
print("=== Verifying Images in All Folders ===")
|
|
verification = verify_images_in_folders(args.ref_path)
|
|
print("\n######## Verifier output ################################")
|
|
print(f"Total Found: {verification['found']}/{verification['total']}")
|
|
print(f"Total Missing: {verification['missing']}/{verification['total']}")
|
|
print("########################################")
|
|
|
|
# Show per-folder breakdown
|
|
print("\n=== Per-Folder Breakdown ===")
|
|
for folder_name, results in verification["folders"].items():
|
|
print(f"{folder_name}: {results['found']}/{results['total']} images found")
|
|
|
|
# Example 2: Create merged dataset from all folders
|
|
print("\n=== Creating Merged Dataset ===")
|
|
ds = create_dataset_from_folders(args.ref_path)
|
|
print("\n######## Merged Dataset output ################################")
|
|
print(f"Final dataset size: {len(ds)} entries")
|
|
print("########################################")
|
|
|
|
# Example 3: Analyze the merged dataset
|
|
print("\n=== Dataset Analysis ===")
|
|
print(ds)
|
|
|
|
# Example 3: Access images and data
|
|
print("\n=== First Example ===")
|
|
first_example = ds[0]
|
|
print(f"Image URL: {first_example['image_url']}")
|
|
print(f"Alt text: {first_example['alt_text']}")
|
|
print(f"Assessment: {first_example['assessment']}")
|
|
print(f"New alt text: {first_example['new_alt_text']}")
|
|
print(f"Image loaded: {first_example['image'] is not None}")
|
|
|
|
if first_example["image"] is not None:
|
|
img = first_example["image"]
|
|
print(f"Image size: {img.size}")
|
|
# img.show() # Uncomment to display image
|
|
|
|
# Example 4: Filter and work with merged data
|
|
print("\n=== Filtering Merged Dataset ===")
|
|
successful = ds.filter(lambda x: x["assessment"] == "success")
|
|
print(f"Successful assessments: {len(successful)}")
|
|
|
|
high_rated = ds.filter(lambda x: int(x["original_alt_text_assessment"]) >= 4)
|
|
print(f"High-rated (>=4): {len(high_rated)}")
|
|
|
|
# Example 5: Save merged dataset
|
|
print("\n=== Saving Merged Dataset ===")
|
|
save_dataset(ds, "alt_text_merged_dataset")
|
|
|
|
# Example 6: Load dataset
|
|
print("\n=== Loading Dataset ===")
|
|
loaded_ds = load_dataset_from_disk("alt_text_merged_dataset")
|
|
print(f"Loaded {len(loaded_ds)} entries")
|
|
|
|
if args.push_to_hub:
|
|
# Push to Hugging Face Hub (optional)
|
|
push_to_hub_example(repo_id=args.repo_id, token=args.token) # function below for details |