File size: 6,907 Bytes
9613bf1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
pipeline/code_analyzer.py
Qwen2.5-Coder 7B + LoRA adapter β€” converts raw code into structured
vulnerability descriptions that RoBERTa can classify.

Input:  raw code snippet (str)
Output: structured NL description (str)
"""

from __future__ import annotations
from typing import Optional
import torch

# ── Constants ────────────────────────────────────────────────────────────────

BASE_MODEL   = "Qwen/Qwen2.5-Coder-7B-Instruct"
ADAPTER_REPO = "martynattakit/vuln-analyzer-qwen-lora"
MAX_INPUT_CHARS  = 3000   # truncate very long functions before tokenizing
MAX_NEW_TOKENS   = 120    # structured description is short

SYSTEM_PROMPT = (
    "You are a security analyst. Given a code snippet, produce exactly one "
    "structured sentence describing the vulnerability it contains.\n\n"
    "Format: \"This function performs <operation> on <input> without "
    "<missing check>, which may allow an attacker to <impact>.\"\n\n"
    "Be specific about the operation and the missing check. "
    "Do not add any other text."
)

# ── Analyzer class ───────────────────────────────────────────────────────────

class CodeAnalyzer:
    """
    Wraps Qwen2.5-Coder 7B + LoRA adapter for code β†’ description inference.
    Lazy-loaded on first call β€” model is large (~5GB in 4-bit).
    """

    def __init__(
        self,
        base_model: str = BASE_MODEL,
        adapter_repo: str = ADAPTER_REPO,
        device: Optional[str] = None,
        load_in_4bit: bool = True,
    ):
        self.base_model   = base_model
        self.adapter_repo = adapter_repo
        self.load_in_4bit = load_in_4bit
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self._model     = None
        self._tokenizer = None

    def _load(self):
        """Lazy load base model + adapter on first inference call."""
        if self._model is not None:
            return

        from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
        from peft import PeftModel

        print(f"[CodeAnalyzer] Loading tokenizer from {self.base_model}...")
        self._tokenizer = AutoTokenizer.from_pretrained(
            self.base_model, trust_remote_code=True
        )
        self._tokenizer.pad_token = self._tokenizer.eos_token

        print(f"[CodeAnalyzer] Loading base model ({self.base_model})...")
        if self.load_in_4bit:
            bnb_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_quant_type="nf4",
                bnb_4bit_compute_dtype=torch.float16,
                bnb_4bit_use_double_quant=True,
            )
            base = AutoModelForCausalLM.from_pretrained(
                self.base_model,
                quantization_config=bnb_config,
                device_map="auto",
                trust_remote_code=True,
            )
        else:
            base = AutoModelForCausalLM.from_pretrained(
                self.base_model,
                torch_dtype=torch.float16,
                device_map="auto",
                trust_remote_code=True,
            )

        print(f"[CodeAnalyzer] Loading LoRA adapter from {self.adapter_repo}...")
        self._model = PeftModel.from_pretrained(base, self.adapter_repo)
        self._model.eval()
        print("[CodeAnalyzer] Model ready.")

    def analyze(self, code: str) -> str:
        """
        Convert a raw code snippet into a structured vulnerability description.

        Args:
            code: Raw source code (any language).

        Returns:
            Structured description string:
            "This function performs X on Y without Z, which may allow an attacker to W."

        Raises:
            ValueError: If code is empty.
        """
        self._load()

        if not code or not code.strip():
            raise ValueError("Code input cannot be empty.")

        # Truncate very long functions β€” context window protection
        code_truncated = code[:MAX_INPUT_CHARS]

        messages = [
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user",   "content": f"Analyze this code:\n\n```\n{code_truncated}\n```"},
        ]

        prompt = self._tokenizer.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
        )

        inputs = self._tokenizer(prompt, return_tensors="pt").to(self._model.device)

        with torch.no_grad():
            output = self._model.generate(
                **inputs,
                max_new_tokens=MAX_NEW_TOKENS,
                do_sample=False,          # greedy β€” consistent output
                temperature=1.0,
                pad_token_id=self._tokenizer.eos_token_id,
            )

        # Decode only the newly generated tokens
        new_tokens = output[0][inputs["input_ids"].shape[1]:]
        description = self._tokenizer.decode(
            new_tokens, skip_special_tokens=True
        ).strip()

        # Fallback if output is empty or malformed
        if not description or len(description) < 20:
            description = (
                "This function contains a vulnerability that may allow "
                "an attacker to cause harm. Manual review recommended."
            )

        return description


# ── Module-level singleton ───────────────────────────────────────────────────

_analyzer: Optional[CodeAnalyzer] = None

def get_analyzer() -> CodeAnalyzer:
    """Return the module-level singleton analyzer."""
    global _analyzer
    if _analyzer is None:
        _analyzer = CodeAnalyzer()
    return _analyzer


def analyze(code: str) -> str:
    """Convenience function β€” analyze without instantiating manually."""
    return get_analyzer().analyze(code)


# ── CLI test ─────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    test_snippets = [
        ('def get_user(username):\n    query = "SELECT * FROM users WHERE name = \'" + username + "\'"\n    return db.execute(query)', "SQL injection"),
        ('void copy(char *dst, char *src) {\n    strcpy(dst, src);\n}', "Buffer overflow"),
        ('def ping(host):\n    os.system("ping -c 1 " + host)', "Command injection"),
    ]

    analyzer = CodeAnalyzer()
    for code, expected in test_snippets:
        desc = analyzer.analyze(code)
        print(f"Expected: {expected}")
        print(f"Output:   {desc}")
        print()