Skip to content

Benchmarking

benchmark_sampling(llm: AutoregressiveSampler, system_message: str, question_list: list[str], answer_list: list[str], enable_thinking: bool, chat_template: bool, sampling_techniques: list[bool], max_questions: int = 0, output_file_name: str = 'math500_power_sampling_results.csv', **kwargs: Any) -> None

Benchmark different sampling techniques on a dataset of math problems.

Parameters:

Name Type Description Default
llm AutoregressiveSampler

The AutoregressiveSampler instance to use for generation.

required
system_message str

The system message to include in prompts.

required
question_list list[str]

List of formatted questions to benchmark.

required
answer_list list[str]

List of correct answers corresponding to the questions.

required
enable_thinking bool

Whether to enable thinking mode in chat templates.

required
chat_template bool

Whether to use chat template formatting for prompts.

required
sampling_techniques list[bool]

List of booleans indicating which sampling techniques to use. [0]: Naive sampling (temperature=1.0) [1]: Low temperature sampling [2]: Power sampling (MCMC)

required
max_questions int

Maximum number of questions to process. 0 means process all.

0
output_file_name str

Path to the output CSV file for results.

'math500_power_sampling_results.csv'
**kwargs Any

Additional keyword arguments passed to sampling functions. log_file_path: Base directory for logging individual question results.

{}

Returns:

Type Description
None

None. Results are written to the output CSV file.

Source code in pita/utils/benchmarking_utils.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def benchmark_sampling(
    llm: AutoregressiveSampler,
    system_message: str,
    question_list: list[str],
    answer_list: list[str],
    enable_thinking: bool,
    chat_template: bool,
    sampling_techniques: list[bool],
    max_questions: int = 0,
    output_file_name: str = "math500_power_sampling_results.csv",
    **kwargs: Any
) -> None:
    """
    Benchmark different sampling techniques on a dataset of math problems.

    Args:
        llm: The AutoregressiveSampler instance to use for generation.
        system_message: The system message to include in prompts.
        question_list: List of formatted questions to benchmark.
        answer_list: List of correct answers corresponding to the questions.
        enable_thinking: Whether to enable thinking mode in chat templates.
        chat_template: Whether to use chat template formatting for prompts.
        sampling_techniques: List of booleans indicating which sampling techniques to use.
            [0]: Naive sampling (temperature=1.0)
            [1]: Low temperature sampling
            [2]: Power sampling (MCMC)
        max_questions: Maximum number of questions to process. 0 means process all.
        output_file_name: Path to the output CSV file for results.
        **kwargs: Additional keyword arguments passed to sampling functions.
            log_file_path: Base directory for logging individual question results.

    Returns:
        None. Results are written to the output CSV file.
    """
    # Store results
    results = []    
    os.makedirs(os.path.dirname(output_file_name), exist_ok=True)
    output_file = open(output_file_name, "w")

    # Create a template for the log file path, defaulting to the output file directory if not provided
    log_base_dir = kwargs.get("log_file_path", os.path.dirname(output_file_name) or ".")
    log_file_path_template = os.path.join(log_base_dir, "question_{}.csv")

    # Iterate over the dataset
    for question_index, question in enumerate(question_list):

        # Break if we have reached the max number of questions to ask
        if(max_questions == question_index and max_questions != 0):
            break

        #Retrive the dataset answer
        answer = answer_list[question_index]

        # Prepare prompt based on whether LLM has chat template or not
        if chat_template:
            formatted_prompt = tokenizer_chat_template(llm.tokenizer, enable_thinking, system_message, question)
        else:
            formatted_prompt = system_message +  question

        # Store the prompt and answers in the results csv
        result_row = {
            "question": formatted_prompt,
            "correct_answer": answer
        }

        # Generate a response using the sampler
        if(sampling_techniques[2]): # Power Sampling
            #Add the question number to the kwargs log_file_path
            kwargs["log_file_path"] = log_file_path_template.format(question_index)

            #Time how long it takes to get a response
            start_time = time.time()

            # Send the prompt to the sliding window power sampling function
            output = llm.token_sample(formatted_prompt, **kwargs)

            # Find the end time of the power sampling
            end_time = time.time()

            # Parse the answer
            power_sampling_answer = parse_answer(llm.tokenizer.decode(output.tokens, skip_special_tokens=False))

            # Save the results
            result_row["mcmc_completion"] = llm.tokenizer.decode(output.tokens, skip_special_tokens=False)
            result_row["mcmc_output_token_count"] = len(output.tokens)
            result_row["mcmc_time_to_solution"] = end_time - start_time
            result_row["mcmc_answer"] = power_sampling_answer

        # Generate a response with just low temperature sampling
        if(sampling_techniques[1]): # Low Temperature Sampling
            #Time how long it takes to get a response
            start_time = time.time()

            # Prompt the LLM and get the output/answer
            output = llm.sample(formatted_prompt)

            # Find the end time of the low temperature sampling
            end_time = time.time()

            # Parse the answer
            low_temp_sampling_answer = parse_answer(llm.tokenizer.decode(output.tokens, skip_special_tokens=False))

            # Save the results
            result_row["naive_completion"] = llm.tokenizer.decode(output.tokens, skip_special_tokens=False)
            result_row["naive_sampling_output_token_count"] = len(output.tokens)
            result_row["naive_sampling_time_to_solution"] = end_time - start_time
            result_row["naive_answer"] = low_temp_sampling_answer

        if(sampling_techniques[0]): # Naive Sampling
            # Save and change the temperature to 1.0 for naive sampling
            saved_temperature = llm.sampling_params.temperature
            llm.sampling_params.temperature = 1.0

            #Time how long it takes to get a response
            start_time = time.time()

            # Prompt the LLM and get the output/answer
            output = llm.sample(formatted_prompt)

            # Find the end time of the naive sampling
            end_time = time.time()

            # Parse the answer
            std_sampling_answer = parse_answer(llm.tokenizer.decode(output.tokens, skip_special_tokens=False))
            # Save the results
            result_row["std_completion"] = llm.tokenizer.decode(output.tokens, skip_special_tokens=False)
            result_row["std_sampling_output_token_count"] = len(output.tokens)
            result_row["std_sampling_time_to_solution"] = end_time - start_time
            result_row["std_answer"] = std_sampling_answer

            # Set the temperature back to original
            llm.sampling_params.temperature = saved_temperature

        # Write the question and final answer to the output file
        results.append(result_row)
        # Write to CSV after each iteration, only write header for first row
        df = pd.DataFrame([result_row])
        df.to_csv(output_file, index=False, header=(question_index==0))
        output_file.flush()
        os.fsync(output_file.fileno())

format_dataset(dataset: datasets.Dataset, pre_question: str, post_question: str) -> tuple[list[str], list[str]]

Format a dataset by adding pre and post question templates to each problem.

Parameters:

Name Type Description Default
dataset Dataset

The dataset containing problems and answers.

required
pre_question str

Text to prepend before each problem.

required
post_question str

Text to append after each problem.

required

Returns:

Type Description
list[str]

A tuple of (question_list, answer_list) where question_list contains

list[str]

formatted questions and answer_list contains corresponding answers.

Source code in pita/utils/benchmarking_utils.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def format_dataset(
    dataset: datasets.Dataset,
    pre_question: str,
    post_question: str
) -> tuple[list[str], list[str]]:
    """
    Format a dataset by adding pre and post question templates to each problem.

    Args:
        dataset: The dataset containing problems and answers.
        pre_question: Text to prepend before each problem.
        post_question: Text to append after each problem.

    Returns:
        A tuple of (question_list, answer_list) where question_list contains
        formatted questions and answer_list contains corresponding answers.
    """
    # Lists to store the questions and answers
    question_list = []
    answer_list = []

    # Iterate through the dataset and format each question
    for(dataset_index, data) in enumerate(dataset):
        # Extract the problem and answer from the dataset
        problem = data["problem"]
        answer = data["answer"]

        # Format the question with pre and post templates
        formatted_question = pre_question + problem + post_question

        # Store back in dataset
        question_list.append(formatted_question)
        answer_list.append(answer)

    return question_list, answer_list

load_benchmark(dataset_name: str) -> tuple[str, list[str], list[str]]

Load a benchmark dataset by name and return formatted questions and answers.

Parameters:

Name Type Description Default
dataset_name str

Name of the dataset to load. Supported values are "MATH500" and "AIME".

required

Returns:

Type Description
str

A tuple of (system_message, question_list, answer_list) where:

list[str]
  • system_message: The system message to use for the chat template
list[str]
  • question_list: List of formatted questions
tuple[str, list[str], list[str]]
  • answer_list: List of corresponding answers

Raises:

Type Description
ValueError

If the dataset_name is not supported.

Source code in pita/utils/benchmarking_utils.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def load_benchmark(
    dataset_name: str
) -> tuple[str, list[str], list[str]]:
    """
    Load a benchmark dataset by name and return formatted questions and answers.

    Args:
        dataset_name: Name of the dataset to load. Supported values are "MATH500" and "AIME".

    Returns:
        A tuple of (system_message, question_list, answer_list) where:
        - system_message: The system message to use for the chat template
        - question_list: List of formatted questions
        - answer_list: List of corresponding answers

    Raises:
        ValueError: If the dataset_name is not supported.
    """
    # Load either the MATH500 or AIME dataset
    if(dataset_name == "MATH500"):
        # Load the Math500 dataset
        dataset = datasets.load_dataset("HuggingFaceH4/MATH-500")["test"]
        # Convert all keys to lowercase
        dataset = dataset.map(lambda x: {k.lower(): v for k, v in x.items()})
        # convert answers to a string
        dataset = dataset.cast_column('answer', datasets.Value('string'))

        # Create the system message, pre, and post question templates
        system_message = MATH_SYSTEM_MESSAGE
        pre_question = MATH_PRE_QUESTION 
        post_question = MATH_ANSWER_FORMAT

    elif(dataset_name == "AIME"):
        #Load both parts of the AIME tests and concatenate them
        dataset = datasets.concatenate_datasets([datasets.load_dataset("opencompass/AIME2025", "AIME2025-I")["test"], 
                                                datasets.load_dataset("opencompass/AIME2025", "AIME2025-II")["test"]])
        # Convert all keys to lowercase
        dataset = dataset.map(lambda x: {k.lower(): v for k, v in x.items()})
        # convert answers to a string
        dataset = dataset.cast_column('answer', datasets.Value('string'))
        # convert the question column name to "problem"
        dataset = dataset.rename_column("question", "problem")

        # Create the system message, pre, and post question templates
        system_message = MATH_SYSTEM_MESSAGE
        pre_question = MATH_PRE_QUESTION
        post_question = MATH_ANSWER_FORMAT

    else: 
        raise ValueError(f"Dataset {dataset_name} not supported for benchmarking.")

    # Format the dataset and return the system message, question list, and answer list
    question_list, answer_list = format_dataset(dataset, pre_question, post_question)
    return system_message, question_list, answer_list

tokenizer_chat_template(tokenizer: AutoTokenizer, enable_thinking: bool, system_message: str, user_message: str) -> str

Format messages for chat models using the tokenizer's chat template.

Parameters:

Name Type Description Default
tokenizer AutoTokenizer

The AutoTokenizer instance to use for formatting.

required
enable_thinking bool

Whether to enable thinking mode in the chat template.

required
system_message str

The system message content to include.

required
user_message str

The user message content to include.

required

Returns:

Type Description
str

The formatted prompt string ready for the model.

Source code in pita/utils/benchmarking_utils.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def tokenizer_chat_template(
    tokenizer: AutoTokenizer,
    enable_thinking: bool,
    system_message: str,
    user_message: str,
) -> str:
    """
    Format messages for chat models using the tokenizer's chat template.

    Args:
        tokenizer: The AutoTokenizer instance to use for formatting.
        enable_thinking: Whether to enable thinking mode in the chat template.
        system_message: The system message content to include.
        user_message: The user message content to include.

    Returns:
        The formatted prompt string ready for the model.
    """

    # Create the message format for apply_chat_template function
    messages = [
        {
            "role": "system",
            # Crucial for benchmarks: explicitly ask for reasoning and boxed format
            "content": system_message
        },
        {
            "role": "user",
            "content": user_message
        }
    ]

    # Apply the chat template to create the final prompt
    prompt = tokenizer.apply_chat_template(
        messages,
        tokenize = False,
        add_generation_prompt = True,
        enable_thinking = enable_thinking
    )

    return prompt