| | import argparse |
| | import json |
| | import re |
| | import os |
| | import unicodedata |
| | from typing import Tuple, List |
| | from multiprocessing import Pool |
| |
|
| | import fasttext |
| | import pandas as pd |
| | from tqdm import tqdm |
| | from transformers import LlamaTokenizerFast |
| |
|
| |
|
| | language_model_map = { |
| | "en": "classifiers/ultra_fineweb_en.bin", |
| | "zh": "classifiers/ultra_fineweb_zh.bin" |
| | } |
| |
|
| | def parse_args(): |
| | parser = argparse.ArgumentParser() |
| | parser.add_argument("--language", type=str, required=True, help="Inference language, support: en, zh.") |
| | parser.add_argument("--data-path", type=str, required=True, help="Data path.") |
| | parser.add_argument("--save-path", type=str, required=True, help="Save path root.") |
| | parser.add_argument("--content-key", type=str, required=True, help="Content key for inference.") |
| | parser.add_argument("--tokenizer-path", type=str, default="local_tokenizer", help="Tokenizer path.") |
| | parser.add_argument("--processes-num", type=int, default=64, help="Number of processes.") |
| | parser.add_argument("--write-batch-size", type=int, default=100, help="Write batch size.") |
| | parser.add_argument("--inplace", action="store_true", help="Inplace already processed data.") |
| | return parser.parse_args() |
| |
|
| |
|
| | def fasttext_preprocess_func(content: str, tokenizer: LlamaTokenizerFast) -> str: |
| | """Fasttext preprocess function. |
| | |
| | Args: |
| | content (str): Content to process. |
| | |
| | Returns: |
| | str: Processed normalized content. |
| | """ |
| |
|
| | |
| | content = re.sub(r'\n{3,}', '\n\n', content) |
| |
|
| | |
| | content = content.lower() |
| |
|
| | |
| | content = ''.join( |
| | c for c in unicodedata.normalize('NFKD', content) |
| | if unicodedata.category(c) != 'Mn') |
| |
|
| | |
| | token_ids = tokenizer.encode(content, add_special_tokens=False) |
| | single_text_list = [] |
| | for token_id in token_ids: |
| | curr_text = tokenizer.decode([token_id]) |
| | single_text_list.append(curr_text) |
| |
|
| | content = ' '.join(single_text_list) |
| |
|
| | |
| | |
| | content = re.sub(r'\n', '\\\\n', content) |
| | content = re.sub(r'\r', '\\\\r', content) |
| | content = re.sub(r'\t', '\\\\t', content) |
| | content = re.sub(r' +', ' ', content) |
| | content = content.strip() |
| |
|
| | return content |
| |
|
| |
|
| | def fasttext_infer(norm_content: str, fasttext_model: fasttext.FastText) -> Tuple[str, float]: |
| | """Fasttext inference function |
| | |
| | Args: |
| | content (str): input text |
| | |
| | Returns: |
| | str: json string with pred_label and pred_score |
| | """ |
| |
|
| | pred_label, pred_prob = fasttext_model.predict(norm_content) |
| | pred_label = pred_label[0] |
| | _score = min(pred_prob.tolist()[0], 1) |
| | if pred_label == "__label__neg": |
| | _score = 1 - _score |
| |
|
| | return pred_label, _score |
| |
|
| |
|
| | def load_data(file_path: str, content_key: str) -> List[str]: |
| | """Load data from file path. |
| | |
| | Args: |
| | file_path (str): File path. |
| | content_key (str): Content key. |
| | |
| | Returns: |
| | List[str]: List of content. |
| | """ |
| | samples = [] |
| | if file_path.endswith(".jsonl") or file_path.endswith(".json"): |
| | with open(file_path, "r", encoding="utf-8") as f: |
| | for line in f: |
| | data = json.loads(line.strip()) |
| | if content_key in data: |
| | if data[content_key] == "": |
| | print("Empty text, continue") |
| | continue |
| | if data[content_key] is None: |
| | print("None text, continue") |
| | continue |
| | samples.append(data[content_key]) |
| | elif file_path.endswith(".parquet"): |
| | df = pd.read_parquet(file_path) |
| | for _, row in df.iterrows(): |
| | if content_key in row: |
| | if row[content_key] == "": |
| | print("Empty text, continue") |
| | continue |
| | if row[content_key] is None: |
| | print("None text, continue") |
| | continue |
| | samples.append(row[content_key]) |
| | else: |
| | raise ValueError(f"Unsupported file type: {file_path}") |
| | return samples |
| |
|
| |
|
| | def process_file( |
| | file_path: str, |
| | tokenizer_path: str, |
| | fasttext_model_path: str, |
| | save_path: str, |
| | item: int, |
| | content_key: str, |
| | inplace: bool, |
| | write_batch_size: int) -> None: |
| | """Process a single file. |
| | |
| | Args: |
| | file_path (str): File path to process. |
| | tokenizer_path (str): Tokenizer path. |
| | fasttext_model_path (str): Fasttext model path. |
| | save_path (str): Save path. |
| | item (int): Current process item index. |
| | content_key (str): Content key. |
| | write_batch_size (int): Write batch size. |
| | """ |
| |
|
| | |
| | tokenizer = LlamaTokenizerFast.from_pretrained(tokenizer_path) |
| | fasttext_model = fasttext.load_model(fasttext_model_path) |
| |
|
| | |
| | all_texts = load_data(file_path, content_key) |
| |
|
| | |
| | file_name = os.path.basename(file_path) |
| | curr_file_name = ".".join(file_name.split(".")[:-1]) |
| |
|
| | output_file = f"{curr_file_name}_fasttext_pos.jsonl" |
| | output_file = os.path.join(save_path, output_file) |
| |
|
| | if inplace and os.path.exists(output_file): |
| | print(f"File {output_file} already exists, skip") |
| | return |
| | |
| | if os.path.exists(output_file): |
| | |
| | print(f"File {output_file} already exists, remove it") |
| | os.remove(output_file) |
| |
|
| | results = [] |
| | print(f"ID: {item}, Begin to process {file_path}, total {len(all_texts)} samples, results will be saved in {output_file}") |
| | for text in tqdm(all_texts): |
| | norm_content = fasttext_preprocess_func(text, tokenizer) |
| | label, score = fasttext_infer(norm_content, fasttext_model) |
| |
|
| | |
| | if label == "__label__pos": |
| | curr_result = {"content": text, "pred_label": label, "pred_score": score} |
| | results.append(curr_result) |
| |
|
| | if len(results) >= write_batch_size: |
| | with open(output_file, "a", encoding="utf-8") as f: |
| | f.write("\n".join(json.dumps(r, ensure_ascii=False) for r in results) + "\n") |
| | results.clear() |
| |
|
| | |
| | if results: |
| | with open(output_file, "a", encoding="utf-8") as f: |
| | f.write("\n".join(json.dumps(r, ensure_ascii=False) for r in results) + "\n") |
| |
|
| |
|
| | def main(): |
| | args = parse_args() |
| | language = args.language |
| | data_path = args.data_path |
| | save_path = args.save_path |
| | content_key = args.content_key |
| | tokenizer_path = args.tokenizer_path |
| | processes_num = args.processes_num |
| | write_batch_size = args.write_batch_size |
| | inplace = args.inplace |
| |
|
| | assert os.path.exists(data_path), f"Data path {data_path} not exists" |
| | assert os.path.exists(tokenizer_path), f"Tokenizer path {tokenizer_path} not exists" |
| |
|
| | assert language in language_model_map, f"Language {language} not supported" |
| | fasttext_model_path = language_model_map[language] |
| |
|
| | if not os.path.exists(save_path): |
| | os.makedirs(save_path, exist_ok=True) |
| | |
| | data_path_list = os.listdir(data_path) |
| | data_path_list = [os.path.join(data_path, file_name) for file_name in data_path_list] |
| |
|
| | print("=" * 100) |
| | print(f"Begin processing\n" |
| | f"- data path: {data_path}\n" |
| | f"- save path: {save_path}\n" |
| | f"- content key: {content_key}\n" |
| | f"- tokenizer path: {tokenizer_path}\n" |
| | f"- processes num: {processes_num}\n" |
| | f"- write batch size: {write_batch_size}\n" |
| | f"- inplace: {inplace}") |
| | print("=" * 100) |
| |
|
| | print(f"Total {len(data_path_list)} files to process") |
| |
|
| | |
| | with Pool(processes=processes_num) as pool: |
| | pool.starmap(process_file, [( |
| | file_path, tokenizer_path, fasttext_model_path, save_path, item, content_key, inplace, write_batch_size) |
| | for item, file_path in enumerate(data_path_list)]) |
| |
|
| | print("Finished processing all files") |
| |
|
| | if __name__ == "__main__": |
| | main() |