| import os |
| import shutil |
| import json |
| from collections import defaultdict |
| import random |
| from tqdm import tqdm |
| from PIL import Image |
|
|
| def convert_coco_to_yolo(coco_json_path, images_dir, output_dir, class_map, split='train'): |
| """Convert COCO format annotations to YOLO format""" |
| if not os.path.exists(coco_json_path): |
| print(f"Warning: JSON file not found: {coco_json_path}") |
| return set() |
| |
| if not os.path.exists(images_dir): |
| print(f"Warning: Images directory not found: {images_dir}") |
| return set() |
| |
| print(f"\nProcessing {split} split...") |
| |
| |
| labels_dir = os.path.join(output_dir, 'labels', split) |
| images_dir_out = os.path.join(output_dir, 'images', split) |
| os.makedirs(labels_dir, exist_ok=True) |
| os.makedirs(images_dir_out, exist_ok=True) |
| |
| |
| try: |
| with open(coco_json_path, 'r') as f: |
| coco = json.load(f) |
| except json.JSONDecodeError: |
| print(f"Error: Invalid JSON file: {coco_json_path}") |
| return set() |
| |
| |
| id_to_filename = {img['id']: img['file_name'] for img in coco['images']} |
| |
| |
| img_to_anns = defaultdict(list) |
| for ann in coco['annotations']: |
| img_to_anns[ann['image_id']].append(ann) |
| |
| |
| processed_images = set() |
| for img_id, anns in tqdm(img_to_anns.items(), desc=f"Converting {split} set"): |
| img_file = id_to_filename[img_id] |
| img_path = os.path.join(images_dir, img_file) |
| |
| if not os.path.exists(img_path): |
| print(f"Warning: Image {img_path} not found, skipping...") |
| continue |
| |
| try: |
| |
| shutil.copy2(img_path, os.path.join(images_dir_out, img_file)) |
| |
| |
| with Image.open(img_path) as im: |
| w, h = im.size |
| |
| |
| label_lines = [] |
| for ann in anns: |
| cat_id = ann['category_id'] |
| if cat_id not in class_map: |
| print(f"Warning: Unknown category ID {cat_id} in {img_file}") |
| continue |
| yolo_cls = class_map[cat_id] |
| |
| |
| for seg in ann['segmentation']: |
| coords = [str(x/w) if i%2==0 else str(x/h) for i,x in enumerate(seg)] |
| label_lines.append(f"{yolo_cls} {' '.join(coords)}") |
| |
| |
| label_file = os.path.join(labels_dir, os.path.splitext(img_file)[0] + '.txt') |
| with open(label_file, 'w') as f: |
| f.write('\n'.join(label_lines)) |
| |
| processed_images.add(img_id) |
| |
| except (IOError, OSError) as e: |
| print(f"Error processing {img_file}: {str(e)}") |
| continue |
| |
| return processed_images |
|
|
| def create_balanced_dataset(source_json, images_dir, output_dir, class_map, min_samples=50, split='train'): |
| """Create balanced dataset by sampling equal number of images per class""" |
| print(f"\nCreating balanced dataset for {split} split...") |
| |
| |
| labels_dir = os.path.join(output_dir, 'labels', split) |
| images_dir_out = os.path.join(output_dir, 'images', split) |
| os.makedirs(labels_dir, exist_ok=True) |
| os.makedirs(images_dir_out, exist_ok=True) |
| |
| |
| with open(source_json, 'r') as f: |
| coco = json.load(f) |
| |
| |
| images_by_part = defaultdict(set) |
| image_to_anns = defaultdict(list) |
| |
| for ann in coco['annotations']: |
| img_id = ann['image_id'] |
| cat_id = ann['category_id'] |
| images_by_part[cat_id].add(img_id) |
| image_to_anns[img_id].append(ann) |
| |
| |
| selected_images = set() |
| for part_images in images_by_part.values(): |
| sample_size = min(min_samples, len(part_images)) |
| selected_images.update(random.sample(list(part_images), sample_size)) |
| |
| |
| id_to_filename = {img['id']: img['file_name'] for img in coco['images']} |
| |
| print(f"Processing {len(selected_images)} images for balanced {split} set...") |
| for img_id in tqdm(selected_images): |
| img_file = id_to_filename[img_id] |
| img_path = os.path.join(images_dir, img_file) |
| |
| if not os.path.exists(img_path): |
| print(f"Warning: Image {img_path} not found, skipping...") |
| continue |
| |
| |
| shutil.copy2(img_path, os.path.join(images_dir_out, img_file)) |
| |
| |
| with Image.open(img_path) as im: |
| w, h = im.size |
| |
| |
| label_lines = [] |
| for ann in image_to_anns[img_id]: |
| cat_id = ann['category_id'] |
| yolo_cls = class_map[cat_id] |
| |
| |
| for seg in ann['segmentation']: |
| coords = [str(x/w) if i%2==0 else str(x/h) for i,x in enumerate(seg)] |
| label_lines.append(f"{yolo_cls} {' '.join(coords)}") |
| |
| |
| label_file = os.path.join(labels_dir, os.path.splitext(img_file)[0] + '.txt') |
| with open(label_file, 'w') as f: |
| f.write('\n'.join(label_lines)) |
|
|
| def main(): |
| base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| source_dir = os.path.join(base_dir, 'damage_detection_dataset') |
| |
| if not os.path.exists(source_dir): |
| print(f"Error: Source directory not found: {source_dir}") |
| return |
| |
| |
| car_damage_dir = os.path.join(base_dir, 'data', 'data_yolo_for_training', 'car_damage_dataset') |
| car_parts_dir = os.path.join(base_dir, 'data', 'data_yolo_for_training', 'car_parts_damage_dataset') |
| |
| |
| damage_class_map = {1: 0} |
| parts_class_map = {1: 0, 2: 1, 3: 2, 4: 3, 5: 4} |
| |
| |
| print("\nProcessing Car Damage Dataset...") |
| for split in ['train', 'val', 'test']: |
| json_name = 'COCO_train_annos.json' if split == 'train' else 'COCO_val_annos.json' |
| json_path = os.path.join(source_dir, split, json_name) |
| images_dir = os.path.join(source_dir, split) |
| |
| if os.path.exists(json_path): |
| convert_coco_to_yolo( |
| json_path, |
| images_dir, |
| car_damage_dir, |
| damage_class_map, |
| split |
| ) |
| else: |
| print(f"Warning: JSON file not found for {split} split: {json_path}") |
| |
| |
| print("\nProcessing Car Parts Dataset...") |
| |
| train_json = os.path.join(source_dir, 'train', 'COCO_mul_train_annos.json') |
| if os.path.exists(train_json): |
| create_balanced_dataset( |
| train_json, |
| os.path.join(source_dir, 'train'), |
| car_parts_dir, |
| parts_class_map, |
| min_samples=50, |
| split='train' |
| ) |
| else: |
| print(f"Warning: Training JSON file not found: {train_json}") |
| |
| |
| for split in ['val', 'test']: |
| json_path = os.path.join(source_dir, split, 'COCO_mul_val_annos.json') |
| images_dir = os.path.join(source_dir, split) |
| |
| if os.path.exists(json_path): |
| convert_coco_to_yolo( |
| json_path, |
| images_dir, |
| car_parts_dir, |
| parts_class_map, |
| split |
| ) |
| else: |
| print(f"Warning: JSON file not found for {split} split: {json_path}") |
|
|
| if __name__ == '__main__': |
| main() |
|
|