diff --git a/cosine.py b/cosine.py deleted file mode 100644 index 97fc5d7..0000000 --- a/cosine.py +++ /dev/null @@ -1,283 +0,0 @@ -# --- Import librerie --- -import pandas as pd -from openai import AzureOpenAI -import pickle -from sentence_transformers import SentenceTransformer -import numpy as np -import faiss -import openpyxl -import re -import json -from openpyxl.styles import PatternFill -from openpyxl import load_workbook -from collections import Counter -from prompts.prompt import build_prompt_local -import warnings -import logging - - -# --- Configurazione --- -endpoint = "https://gpt-sw-central-tap-security.openai.azure.com/" -deployment = "gpt-4o" -subscription_key = "8zufUIPs0Dijh0M6NpifkkDvxJHZMFtott7u8V8ySTYNcpYVoRbsJQQJ99BBACfhMk5XJ3w3AAABACOGr6sq" - -client = AzureOpenAI( - azure_endpoint=endpoint, - api_key=subscription_key, - api_version="2024-05-01-preview", -) - -# ----- Step 1: caricare datasets ----- -df_labeled = pd.read_csv("main/datasets/annotated_dataset.csv", encoding="cp1252", sep=";") -df_unlabeled = pd.read_csv("main/datasets/unlabeled_dataset.csv", sep="\t", encoding="utf-8") -print( - "***STEP 1***\nDataset etichettato caricato. Numero righe:", - len(df_labeled), - "\nDataset non etichettato caricato. Numero righe:", - len(df_unlabeled), -) - -def clean_id(x): - if pd.isna(x): - return "" - s = str(x) - m = re.search(r"\d+", s) - return m.group(0) if m else s.strip() - -df_labeled["automation_id"] = df_labeled["automation_id"].apply(clean_id) -df_unlabeled["automation_id"] = df_unlabeled["automation_id"].apply(clean_id) -df_labeled["folder"] = df_labeled["folder"].astype(str).str.strip() -df_unlabeled["folder"] = df_unlabeled["folder"].astype(str).str.strip() - -labeled_pairs = set(zip(df_labeled["automation_id"], df_labeled["folder"])) -df_unlabeled_filtered = df_unlabeled[ - ~df_unlabeled.apply(lambda row: (row["automation_id"], row["folder"]) in labeled_pairs, axis=1) -] -print("Automazioni non etichettate rimanenti dopo la pulizia:", len(df_unlabeled_filtered)) - - -# ----- Step 2: embeddings ----- -warnings.filterwarnings("ignore") -logging.getLogger("sentence_transformers").setLevel(logging.ERROR) -logging.getLogger("transformers").setLevel(logging.ERROR) -logging.getLogger("huggingface_hub").setLevel(logging.ERROR) - -print("\n***Step 2***\nEmbeddings") -model = SentenceTransformer("all-MiniLM-L6-v2") - -with open("main/labeled_embeddings.pkl", "rb") as f: - data = pickle.load(f) - -embeddings = data["embeddings"].astype("float32") -print("Shape embeddings:", embeddings.shape) - -# ⚠️ Cosine: normalizza i vettori -faiss.normalize_L2(embeddings) - - -# ----- Step 3: indice FAISS (Cosine via Inner Product) ----- -dimension = embeddings.shape[1] -index = faiss.IndexFlatIP(dimension) # inner product su vettori normalizzati = cosine similarity -index.add(embeddings) -print(f"\n***Step 3: Indice FAISS creato***.\nNumero di vettori nell'indice: {index.ntotal}") - - -# ----- Step 4: Retrieval (similarità cosine) ----- -k = 5 -output_rows = [] -df_sample = df_unlabeled_filtered.head(20).reset_index(drop=True) # SOLO prime 20 -llm_rows = [] - -def sim_label(sim: float) -> str: - # sim è cosine similarity (più alto = più simile) - if sim >= 0.85: - return "Match forte" - elif sim >= 0.70: - return "Match plausibile" - elif sim >= 0.55: - return "Similarità instabile" - else: - return "Debole" - -for count, (_, row) in enumerate(df_sample.iterrows(), start=1): - query_text = str(row["human_like"]) - print("numero corrente:", count) - - # embedding query + normalizzazione (cosine) - query_emb = model.encode([query_text], convert_to_numpy=True).astype("float32") - faiss.normalize_L2(query_emb) - - # search: ritorna cosine similarity (inner product) - sims, indices = index.search(query_emb, k) - - topk_cats = [] - top1_sim = float(sims[0][0]) - top1_similarity_label = sim_label(top1_sim) - - for rank in range(k): - idx = int(indices[0][rank]) - sim = float(sims[0][rank]) - - retrieved_row = df_labeled.iloc[idx] - topk_cats.append(str(retrieved_row.get("category", ""))) - - rank1_category = topk_cats[0] if topk_cats else "" - majority_category = Counter(topk_cats).most_common(1)[0][0] if topk_cats else "" - consistency = (sum(c == majority_category for c in topk_cats) / len(topk_cats)) if topk_cats else 0.0 - - # Salva analisi retrieval (opzionale) - for rank in range(k): - idx = int(indices[0][rank]) - sim = float(sims[0][rank]) - label = sim_label(sim) - - retrieved_row = df_labeled.iloc[idx] - - output_rows.append({ - "automazione da etichettare": query_text, - "rank": rank + 1, - "retrieved_idx": idx, - "automazione simile": retrieved_row.get("automation", ""), - "categoria automazione simile": retrieved_row.get("category", ""), - "similarita_cosine": sim, - "similarity_label": label, - - "rank1_similarity": top1_sim, - "rank1_similarity_label": top1_similarity_label, - "rank1_category": rank1_category, - "majority_category": majority_category, - "consistency": round(consistency, 3), - "top5_categories": " | ".join(topk_cats), - }) - - # ----- Step 5: invio dati al LLM ----- - # NB: build_prompt_local deve usare la colonna "similarity" (non "distance"). - retrieved = df_labeled.iloc[indices[0]].copy() - retrieved["similarity"] = sims[0].astype(float) - retrieved["similarity_label"] = retrieved["similarity"].apply(sim_label) - - # Se nel prompt vuoi anche un numero "confidence", puoi usare direttamente similarity - retrieved["confidence"] = retrieved["similarity"] - - prompt = build_prompt_local(query_text, retrieved, sim_label) - - resp = client.chat.completions.create( - model=deployment, - messages=[ - {"role": "system", "content": "Return ONLY valid JSON. No extra text."}, - {"role": "user", "content": prompt}, - ], - temperature=0, - ) - content = resp.choices[0].message.content.strip() - - try: - parsed = json.loads(content) - except Exception: - parsed = { - "automation": query_text, - "category": "", - "subcategory": "", - "problem_type": "", - "gravity": "", - "scores": {}, - "needs_human_review": True, - "short_rationale": f"JSON_PARSE_ERROR: {content[:200]}", - } - - # ----- Normalizzazione output LLM + final labels ----- - llm_category = str(parsed.get("category", "")).strip() - llm_subcategory = str(parsed.get("subcategory", "")).strip() - llm_problem_type = str(parsed.get("problem_type", "")).strip() - llm_gravity = str(parsed.get("gravity", "")).strip() - - # Regola deterministica HARMLESS - if llm_category.upper() == "HARMLESS": - llm_subcategory = "" - llm_problem_type = "none" - llm_gravity = "NONE" - - final_category = llm_category - final_subcategory = llm_subcategory - final_problem_type = llm_problem_type - final_gravity = llm_gravity - - # ----- HUMAN REVIEW LOGIC (su SIMILARITÀ, non distanza) ----- - needs_human_review = bool(parsed.get("needs_human_review", True)) - - # soglie cosine (da tarare) - OVERRIDE_MIN_SIMILARITY = 0.70 - OVERRIDE_MIN_CONSISTENCY = 0.60 - - aligned_strong = ( - final_category == majority_category - and final_category == rank1_category - and final_category != "" - ) - - good_retrieval = (top1_sim >= OVERRIDE_MIN_SIMILARITY) and (consistency >= OVERRIDE_MIN_CONSISTENCY) - - if aligned_strong and good_retrieval: - needs_human_review = False - - llm_rows.append({ - "automation_id": row.get("automation_id", ""), - "folder": row.get("folder", ""), - "automation_text": query_text, - - # Retrieval metrics (cosine) - "rank1_similarity": top1_sim, - "rank1_similarity_label": top1_similarity_label, - "rank1_category": rank1_category, - "majority_category": majority_category, - "consistency": round(consistency, 3), - "top5_categories": " | ".join(topk_cats), - - # LLM raw - "llm_category": llm_category, - "llm_subcategory": llm_subcategory, - "llm_problem_type": llm_problem_type, - "llm_gravity": llm_gravity, - "llm_needs_human_review": bool(parsed.get("needs_human_review", True)), - - # FINAL - "final_category": final_category, - "final_subcategory": final_subcategory, - "final_problem_type": final_problem_type, - "final_gravity": final_gravity, - "final_needs_human_review": needs_human_review, - - "llm_rationale": parsed.get("short_rationale", ""), - }) - - -# ----- Step 6: output Excel ----- -df_llm = pd.DataFrame(llm_rows) -out_path = "main/datasets/labeling_first20_cosine.xlsx" -df_llm.to_excel(out_path, index=False) - -wb = load_workbook(out_path) -ws = wb.active - -true_fill = PatternFill(start_color="FF6347", end_color="FF6347", fill_type="solid") # rosso -false_fill = PatternFill(start_color="90EE90", end_color="90EE90", fill_type="solid") # verde - -col_index = {cell.value: idx for idx, cell in enumerate(ws[1], start=1)} - -for col_name in ["llm_needs_human_review", "final_needs_human_review"]: - if col_name in col_index: - c = col_index[col_name] - for r in range(2, ws.max_row + 1): - val = ws.cell(row=r, column=c).value - if val is True: - ws.cell(row=r, column=c).fill = true_fill - elif val is False: - ws.cell(row=r, column=c).fill = false_fill - -wb.save(out_path) -print(f"\n***Step 6: Retrieval (cosine) + LLM ***\nExcel salvato in {out_path}") - -review_counts = df_llm["final_needs_human_review"].value_counts(dropna=False) -print("\n--- Needs human review summary (final) ---") -print(f"True : {review_counts.get(True, 0)}") -print(f"False: {review_counts.get(False, 0)}") \ No newline at end of file