| import json |
| import logging |
| import datasets |
| import requests |
| import math |
| import re |
| from datasets import load_dataset, get_dataset_config_names, get_dataset_infos |
| from huggingface_hub import HfApi, DatasetCard, DatasetCardData |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class DatasetCommandCenter: |
| def __init__(self, token=None): |
| self.token = token |
| self.api = HfApi(token=token) |
| self.username=self.api.whoami()['name'] |
| print("######################################") |
| print(self.username) |
| print("######################################") |
| |
| |
| |
| |
|
|
| def get_dataset_metadata(self, dataset_id): |
| """ |
| Fetches Configs and Splits. |
| """ |
| configs = ['default'] |
| splits = ['train', 'test', 'validation'] |
| license_name = "unknown" |
|
|
| try: |
| |
| try: |
| found_configs = get_dataset_config_names(dataset_id, token=self.token) |
| if found_configs: |
| configs = found_configs |
| except Exception: |
| pass |
|
|
| |
| try: |
| selected = configs[0] |
| infos = get_dataset_infos(dataset_id, token=self.token) |
| print(infos) |
| info = None |
| if selected in infos: |
| info = infos[selected] |
| elif 'default' in infos: |
| info = infos['default'] |
| elif infos: |
| info = list(infos.values())[0] |
|
|
| if info: |
| splits = list(info.splits.keys()) |
| license_name = info.license or "unknown" |
| except Exception: |
| pass |
|
|
| return { |
| "status": "success", |
| "configs": configs, |
| "splits": splits, |
| "license_detected": license_name |
| } |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| def get_splits_for_config(self, dataset_id, config_name): |
| try: |
| infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token) |
| if config_name in infos: |
| splits = list(infos[config_name].splits.keys()) |
| elif len(infos) > 0: |
| splits = list(infos.values())[0].splits.keys() |
| else: |
| splits = ['train', 'test'] |
| return {"status": "success", "splits": splits} |
| except: |
| return {"status": "success", "splits": ['train', 'test', 'validation']} |
|
|
| def _sanitize_for_json(self, obj): |
| """ |
| Recursively cleans data for JSON serialization. |
| """ |
| if isinstance(obj, float): |
| if math.isnan(obj) or math.isinf(obj): |
| return None |
| return obj |
| elif isinstance(obj, dict): |
| return {k: self._sanitize_for_json(v) for k, v in obj.items()} |
| elif isinstance(obj, list): |
| return [self._sanitize_for_json(v) for v in obj] |
| elif isinstance(obj, (str, int, bool, type(None))): |
| return obj |
| else: |
| return str(obj) |
|
|
| def _flatten_object(self, obj, parent_key='', sep='.'): |
| """ |
| Recursively finds keys for the UI dropdowns. |
| """ |
| items = {} |
| |
| |
| if isinstance(obj, str): |
| s = obj.strip() |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): |
| try: |
| obj = json.loads(s) |
| except: |
| pass |
|
|
| if isinstance(obj, dict): |
| for k, v in obj.items(): |
| new_key = f"{parent_key}{sep}{k}" if parent_key else k |
| items.update(self._flatten_object(v, new_key, sep=sep)) |
| elif isinstance(obj, list): |
| new_key = f"{parent_key}" if parent_key else "list_content" |
| items[new_key] = "List" |
| else: |
| items[parent_key] = type(obj).__name__ |
| |
| return items |
|
|
| def inspect_dataset(self, dataset_id, config, split): |
| try: |
| conf = config if config != 'default' else None |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) |
| |
| sample_rows = [] |
| available_paths = set() |
| schema_map = {} |
|
|
| for i, row in enumerate(ds_stream): |
| if i >= 10: break |
| |
| |
| row = dict(row) |
| |
| |
| clean_row = self._sanitize_for_json(row) |
| sample_rows.append(clean_row) |
|
|
| |
| flattened = self._flatten_object(row) |
| available_paths.update(flattened.keys()) |
|
|
| |
| for k, v in row.items(): |
| if k not in schema_map: |
| schema_map[k] = {"type": "Object"} |
| |
| val = v |
| if isinstance(val, str): |
| try: val = json.loads(val) |
| except: pass |
| |
| if isinstance(val, list): |
| schema_map[k]["type"] = "List" |
|
|
| sorted_paths = sorted(list(available_paths)) |
| schema_tree = {} |
| for path in sorted_paths: |
| root = path.split('.')[0] |
| if root not in schema_tree: |
| schema_tree[root] = [] |
| schema_tree[root].append(path) |
|
|
| return { |
| "status": "success", |
| "samples": sample_rows, |
| "schema_tree": schema_tree, |
| "schema": schema_map, |
| "dataset_id": dataset_id |
| } |
| except Exception as e: |
| return {"status": "error", "message": str(e)} |
|
|
| |
| |
| |
|
|
| def _get_value_by_path(self, obj, path): |
| """ |
| Retrieves value. PRIORITY: Direct Key Access (Fastest). |
| """ |
| if not path: |
| return obj |
| |
| |
| if path is None or path == '': |
| return obj |
| |
| |
| |
| try: |
| |
| if '.' not in path: |
| return obj[path] |
| except (KeyError, TypeError, AttributeError): |
| pass |
| |
| |
| keys = path.split('.') |
| current = obj |
| |
| for i, key in enumerate(keys): |
| if current is None: |
| return None |
| |
| try: |
| |
| if isinstance(current, list) and key.isdigit(): |
| current = current[int(key)] |
| else: |
| |
| current = current[key] |
| except (KeyError, TypeError, IndexError, AttributeError): |
| return None |
| |
| |
| is_last_key = (i == len(keys) - 1) |
| if not is_last_key and isinstance(current, str): |
| s = current.strip() |
| if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): |
| try: |
| current = json.loads(s) |
| except: |
| return None |
| |
| return current |
|
|
|
|
|
|
| def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path): |
| """ |
| FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path |
| """ |
| data = row.get(source_col) |
| |
| if isinstance(data, str): |
| try: |
| data = json.loads(data) |
| except: |
| return None |
| |
| if not isinstance(data, list): |
| return None |
|
|
| matched_item = None |
| for item in data: |
| |
| if str(item.get(filter_key, '')) == str(filter_val): |
| matched_item = item |
| break |
| |
| if matched_item: |
| return self._get_value_by_path(matched_item, target_path) |
| |
| return None |
|
|
| def _apply_projection(self, row, recipe): |
| new_row = {} |
| |
| |
| |
| eval_context = None |
| |
| for col_def in recipe['columns']: |
| t_type = col_def.get('type', 'simple') |
| target_col = col_def['name'] |
| |
| try: |
| if t_type == 'simple': |
| |
| new_row[target_col] = self._get_value_by_path(row, col_def['source']) |
| |
| elif t_type == 'list_search': |
| |
| new_row[target_col] = self._extract_from_list_logic( |
| row, |
| col_def['source'], |
| col_def['filter_key'], |
| col_def['filter_val'], |
| col_def['target_key'] |
| ) |
| |
| elif t_type == 'python': |
| if eval_context is None: |
| eval_context = row.copy() |
| eval_context['row'] = row |
| eval_context['json'] = json |
| eval_context['re'] = re |
| eval_context['requests'] = requests |
| |
| |
| val = eval(col_def['expression'], {}, eval_context) |
| new_row[target_col] = val |
| |
| elif t_type == 'requests': |
| print(t_type) |
| |
| eval_context = row.copy() |
| eval_context['row'] = row |
| |
| print(col_def['rpay']) |
| val = json.loads(col_def['rpay']) |
| print(val) |
| new_row[target_col] = requests.post(col_def['rurl'], json=val).text |
| |
| except Exception as e: |
| raise ValueError(f"Column '{target_col}' failed: {str(e)}") |
|
|
| return new_row |
|
|
| |
| |
| |
|
|
| def _generate_card(self, source_id, target_id, recipe, license_name): |
| print(source_id) |
| print(target_id) |
| card_data = DatasetCardData( |
| language="en", |
| license=license_name, |
| tags=["dataset-command-center", "etl", "generated-dataset"], |
| base_model=source_id, |
| ) |
| |
| content = f""" |
| # {target_id.split('/')[-1]} |
| |
| This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}). |
| It was generated using the **Hugging Face Dataset Command Center**. |
| |
| ## Transformation Recipe |
| |
| The following operations were applied to the source data: |
| |
| | Target Column | Operation Type | Source / Logic | |
| |---------------|----------------|----------------| |
| """ |
| for col in recipe['columns']: |
| c_type = col.get('type', 'simple') |
| c_name = col['name'] |
| c_src = col.get('source', '-') |
| |
| logic = "-" |
| if c_type == 'simple': |
| logic = f"Mapped from `{c_src}`" |
| elif c_type == 'list_search': |
| logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`" |
| elif c_type == 'python': |
| logic = f"Python: `{col.get('expression')}`" |
| |
| content += f"| **{c_name}** | {c_type} | {logic} |\n" |
|
|
| if recipe.get('filter_rule'): |
| content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n" |
|
|
| content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source." |
|
|
| card = DatasetCard.from_template(card_data, content=content) |
| return card |
|
|
| |
| |
| |
|
|
| def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None): |
| logger.info(f"Job started: {source_id} -> {target_id}") |
| conf = config if config != 'default' else None |
| |
| def gen(): |
| ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token) |
| count = 0 |
| for i, row in enumerate(ds_stream): |
| if max_rows and count >= int(max_rows): |
| break |
| |
| |
| row = dict(row) |
|
|
| |
| if recipe.get('filter_rule'): |
| try: |
| ctx = row.copy() |
| ctx['row'] = row |
| ctx['json'] = json |
| ctx['re'] = re |
| ctx['requests'] = requests |
| if not eval(recipe['filter_rule'], {}, ctx): |
| continue |
| except Exception as e: |
| raise ValueError(f"Filter crashed on row {i}: {e}") |
|
|
| |
| try: |
| yield self._apply_projection(row, recipe) |
| count += 1 |
| except ValueError as ve: |
| raise ve |
| except Exception as e: |
| raise ValueError(f"Unexpected crash on row {i}: {e}") |
|
|
| try: |
| |
| new_dataset = datasets.Dataset.from_generator(gen) |
| new_dataset.push_to_hub(target_id, token=self.token) |
| |
| |
| try: |
| card = self._generate_card(source_id, target_id, recipe, new_license or "unknown") |
| card.push_to_hub(f'{self.username}/{target_id}', token=self.token) |
| except Exception as e: |
| logger.error(f"Failed to push Dataset Card: {e}") |
|
|
| return {"status": "success", "rows_processed": len(new_dataset)} |
| |
| except Exception as e: |
| logger.error(f"Job Failed: {e}") |
| return {"status": "failed", "error": str(e)} |
|
|
| |
| |
| |
|
|
| def preview_transform(self, dataset_id, config, split, recipe): |
| conf = config if config != 'default' else None |
| |
| try: |
| |
| ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) |
| processed = [] |
| |
| for i, row in enumerate(ds_stream): |
| |
| if len(processed) >= 5: |
| break |
| |
| |
| |
| row = dict(row) |
|
|
| |
| passed = True |
| if recipe.get('filter_rule'): |
| try: |
| |
| ctx = row.copy() |
| ctx['row'] = row |
| ctx['json'] = json |
| ctx['re'] = re |
| if not eval(recipe['filter_rule'], {}, ctx): |
| passed = False |
| except: |
| |
| passed = False |
| |
| if passed: |
| try: |
| |
| new_row = self._apply_projection(row, recipe) |
| |
| |
| |
| clean_new_row = self._sanitize_for_json(new_row) |
| |
| processed.append(clean_new_row) |
| except Exception as e: |
| |
| processed.append({"_preview_error": f"Row {i} Error: {str(e)}"}) |
|
|
| return processed |
| |
| except Exception as e: |
| |
| raise e |