| import os |
| import re |
| import time |
| import regex |
| import requests |
| from tqdm import tqdm |
| from typing import Union, Any, List, Set |
|
|
| from ..core.logging import logger |
|
|
| def make_parent_folder(path: str): |
| """Checks if the parent folder of a given path exists, and creates it if not. |
| |
| Args: |
| path (str): The file path for which to create the parent folder. |
| """ |
| dir_folder = os.path.dirname(path) |
| if dir_folder and not os.path.exists(dir_folder): |
| logger.info(f"creating folder {dir_folder} ...") |
| os.makedirs(dir_folder, exist_ok=True) |
|
|
| def safe_remove(data: Union[List[Any], Set[Any]], remove_value: Any): |
| try: |
| data.remove(remove_value) |
| except ValueError: |
| pass |
|
|
| def generate_dynamic_class_name(base_name: str) -> str: |
|
|
| base_name = base_name.strip() |
| |
| cleaned_name = re.sub(r'[^a-zA-Z0-9\s]', ' ', base_name) |
| components = cleaned_name.split() |
| class_name = ''.join(x.capitalize() for x in components) |
|
|
| return class_name if class_name else 'DefaultClassName' |
|
|
| def normalize_text(s: str) -> str: |
|
|
| def remove_articles(text): |
| return regex.sub(r'\b(a|an|the)\b', ' ', text) |
|
|
| def white_space_fix(text): |
| return ' '.join(text.split()) |
|
|
| def remove_punc(text): |
| return text.replace("_", " ") |
| |
| |
|
|
| def lower(text): |
| return text.lower() |
|
|
| return white_space_fix(remove_articles(remove_punc(lower(s)))) |
|
|
|
|
| def download_file(url: str, save_file: str, max_retries=3, timeout=10): |
|
|
| make_parent_folder(save_file) |
| for attempt in range(max_retries): |
| try: |
| resume_byte_pos = 0 |
| if os.path.exists(save_file): |
| resume_byte_pos = os.path.getsize(save_file) |
| |
| response_head = requests.head(url=url) |
| total_size = int(response_head.headers.get("content-length", 0)) |
|
|
| if resume_byte_pos >= total_size: |
| logger.info("File already downloaded completely.") |
| return |
|
|
| headers = {'Range': f'bytes={resume_byte_pos}-'} if resume_byte_pos else {} |
| response = requests.get(url=url, stream=True, headers=headers, timeout=timeout) |
| response.raise_for_status() |
| |
| mode = 'ab' if resume_byte_pos else 'wb' |
| progress_bar = tqdm(total=total_size, unit="iB", unit_scale=True, initial=resume_byte_pos) |
| |
| with open(save_file, mode) as file: |
| for chunk_data in response.iter_content(chunk_size=1024): |
| if chunk_data: |
| size = file.write(chunk_data) |
| progress_bar.update(size) |
| |
| progress_bar.close() |
|
|
| if os.path.getsize(save_file) >= (total_size + resume_byte_pos): |
| logger.info("Download completed successfully.") |
| break |
| else: |
| logger.warning("File size mismatch, retrying...") |
| time.sleep(5) |
| except (requests.ConnectionError, requests.Timeout) as e: |
| logger.warning(f"Download error: {e}. Retrying ({attempt+1}/{max_retries})...") |
| time.sleep(5) |
| except Exception as e: |
| error_message = f"Unexpected error: {e}" |
| logger.error(error_message) |
| raise ValueError(error_message) |
| else: |
| error_message = "Exceeded maximum retries. Download failed." |
| logger.error(error_message) |
| raise RuntimeError(error_message) |
|
|