|
import json |
|
from tqdm import tqdm |
|
import logging |
|
import threading |
|
from smolagents import CodeAgent |
|
from custom_agent import CustomCodeAgent |
|
from custom_litellm import LiteLLMModelWithBackOff |
|
from huggingface_hub import hf_hub_download |
|
from constants import REPO_ID, ADDITIONAL_AUTHORIZED_IMPORTS |
|
from pathlib import Path |
|
from prompts import reasoning_llm_system_prompt, chat_llm_system_prompt |
|
|
|
append_answer_lock = threading.Lock() |
|
append_console_output_lock = threading.Lock() |
|
|
|
class TqdmLoggingHandler(logging.Handler): |
|
def emit(self, record): |
|
tqdm.write(self.format(record)) |
|
|
|
def read_only_open(*a, **kw): |
|
if (len(a) > 1 and isinstance(a[1], str) and a[1] != 'r') or kw.get('mode', 'r') != 'r': |
|
raise Exception("Only mode='r' allowed for the function open") |
|
return open(*a, **kw) |
|
|
|
def download_context(base_dir: str) -> str: |
|
ctx_files = [ |
|
"data/context/acquirer_countries.csv", |
|
"data/context/payments.csv", |
|
"data/context/merchant_category_codes.csv", |
|
"data/context/fees.json", |
|
"data/context/merchant_data.json", |
|
"data/context/manual.md", |
|
"data/context/payments-readme.md" |
|
] |
|
for f in ctx_files: |
|
hf_hub_download(REPO_ID, repo_type="dataset", filename=f, local_dir=base_dir, force_download=True) |
|
|
|
root_dir = Path(__file__).resolve().parent.parent |
|
full_path = Path(base_dir) / Path(ctx_files[0]).parent |
|
relative_path = full_path.relative_to(root_dir) |
|
return str(relative_path) |
|
|
|
def is_reasoning_llm(model_id: str) -> bool: |
|
reasoning_llm_list = [ |
|
"openai/o1", |
|
"openai/o3", |
|
"openai/o3-mini", |
|
"deepseek/deepseek-reasoner" |
|
] |
|
return model_id in reasoning_llm_list |
|
|
|
def get_tasks_to_run(data, total: int, base_filename: Path, tasks_ids: list[int]): |
|
import json |
|
f = base_filename.parent / f"{base_filename.stem}_answers.jsonl" |
|
done = set() |
|
if f.exists(): |
|
with open(f, encoding="utf-8") as fh: |
|
done = {json.loads(line)["task_id"] for line in fh if line.strip()} |
|
|
|
tasks = [] |
|
for i in range(total): |
|
task_id = int(data[i]["task_id"]) |
|
if task_id not in done: |
|
if tasks_ids is not None: |
|
if task_id in tasks_ids: |
|
tasks.append(data[i]) |
|
else: |
|
tasks.append(data[i]) |
|
return tasks |
|
|
|
|
|
def append_answer(entry: dict, jsonl_file: Path) -> None: |
|
jsonl_file.parent.mkdir(parents=True, exist_ok=True) |
|
with append_answer_lock, open(jsonl_file, "a", encoding="utf-8") as fp: |
|
fp.write(json.dumps(entry) + "\n") |
|
|
|
|
|
def append_console_output(captured_text: str, txt_file: Path) -> None: |
|
txt_file.parent.mkdir(parents=True, exist_ok=True) |
|
with append_console_output_lock, open(txt_file, "a", encoding="utf-8") as fp: |
|
fp.write(captured_text + "\n") |
|
|
|
def create_code_agent_with_reasoning_llm(model_id: str, api_base=None, api_key=None, max_steps=10, ctx_path=None): |
|
agent = CustomCodeAgent( |
|
system_prompt=reasoning_llm_system_prompt, |
|
tools=[], |
|
model=LiteLLMModelWithBackOff( |
|
model_id=model_id, api_base=api_base, api_key=api_key, max_tokens=None, max_completion_tokens=3000), |
|
additional_authorized_imports=ADDITIONAL_AUTHORIZED_IMPORTS, |
|
max_steps=max_steps, |
|
verbosity_level=3, |
|
) |
|
agent.python_executor.static_tools.update({"open": read_only_open}) |
|
|
|
agent.system_prompt = agent.system_prompt.format(ctx_path=ctx_path) |
|
return agent |
|
|
|
def create_code_agent_with_chat_llm(model_id: str, api_base=None, api_key=None, max_steps=10): |
|
agent = CodeAgent( |
|
system_prompt=chat_llm_system_prompt, |
|
tools=[], |
|
model=LiteLLMModelWithBackOff(model_id=model_id, api_base=api_base, api_key=api_key, max_tokens=3000), |
|
additional_authorized_imports=ADDITIONAL_AUTHORIZED_IMPORTS, |
|
max_steps=max_steps, |
|
verbosity_level=3, |
|
) |
|
|
|
agent.python_executor.static_tools.update({"open": read_only_open}) |
|
return agent |
|
|