Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import json | |
from tqdm import tqdm | |
import logging | |
import threading | |
from smolagents import CodeAgent | |
from custom_agent import CustomCodeAgent | |
from custom_litellm import LiteLLMModelWithBackOff | |
from huggingface_hub import hf_hub_download | |
from constants import REPO_ID, ADDITIONAL_AUTHORIZED_IMPORTS | |
from pathlib import Path | |
from prompts import reasoning_llm_system_prompt, chat_llm_system_prompt | |
append_answer_lock = threading.Lock() | |
append_console_output_lock = threading.Lock() | |
class TqdmLoggingHandler(logging.Handler): | |
def emit(self, record): | |
tqdm.write(self.format(record)) | |
def read_only_open(*a, **kw): | |
if (len(a) > 1 and isinstance(a[1], str) and a[1] != 'r') or kw.get('mode', 'r') != 'r': | |
raise Exception("Only mode='r' allowed for the function open") | |
return open(*a, **kw) | |
def download_context(base_dir: str) -> str: | |
ctx_files = [ | |
"data/context/acquirer_countries.csv", | |
"data/context/payments.csv", | |
"data/context/merchant_category_codes.csv", | |
"data/context/fees.json", | |
"data/context/merchant_data.json", | |
"data/context/manual.md", | |
"data/context/payments-readme.md" | |
] | |
for f in ctx_files: | |
hf_hub_download(REPO_ID, repo_type="dataset", filename=f, local_dir=base_dir, force_download=True) | |
root_dir = Path(__file__).resolve().parent.parent | |
full_path = Path(base_dir) / Path(ctx_files[0]).parent | |
relative_path = full_path.relative_to(root_dir) | |
return str(relative_path) | |
def is_reasoning_llm(model_id: str) -> bool: | |
reasoning_llm_list = [ | |
"openai/o1", | |
"openai/o3", | |
"openai/o3-mini", | |
"deepseek/deepseek-reasoner" | |
] | |
return model_id in reasoning_llm_list | |
def get_tasks_to_run(data, total: int, base_filename: Path, tasks_ids: list[int]): | |
import json | |
f = base_filename.parent / f"{base_filename.stem}_answers.jsonl" | |
done = set() | |
if f.exists(): | |
with open(f, encoding="utf-8") as fh: | |
done = {json.loads(line)["task_id"] for line in fh if line.strip()} | |
tasks = [] | |
for i in range(total): | |
task_id = int(data[i]["task_id"]) | |
if task_id not in done: | |
if tasks_ids is not None: | |
if task_id in tasks_ids: | |
tasks.append(data[i]) | |
else: | |
tasks.append(data[i]) | |
return tasks | |
def append_answer(entry: dict, jsonl_file: Path) -> None: | |
jsonl_file.parent.mkdir(parents=True, exist_ok=True) | |
with append_answer_lock, open(jsonl_file, "a", encoding="utf-8") as fp: | |
fp.write(json.dumps(entry) + "\n") | |
def append_console_output(captured_text: str, txt_file: Path) -> None: | |
txt_file.parent.mkdir(parents=True, exist_ok=True) | |
with append_console_output_lock, open(txt_file, "a", encoding="utf-8") as fp: | |
fp.write(captured_text + "\n") | |
def create_code_agent_with_reasoning_llm(model_id: str, api_base=None, api_key=None, max_steps=10, ctx_path=None): | |
agent = CustomCodeAgent( | |
system_prompt=reasoning_llm_system_prompt, | |
tools=[], | |
model=LiteLLMModelWithBackOff( | |
model_id=model_id, api_base=api_base, api_key=api_key, max_tokens=None, max_completion_tokens=3000), | |
additional_authorized_imports=ADDITIONAL_AUTHORIZED_IMPORTS, | |
max_steps=max_steps, | |
verbosity_level=3, | |
) | |
agent.python_executor.static_tools.update({"open": read_only_open}) | |
agent.system_prompt = agent.system_prompt.format(ctx_path=ctx_path) | |
return agent | |
def create_code_agent_with_chat_llm(model_id: str, api_base=None, api_key=None, max_steps=10): | |
agent = CodeAgent( | |
system_prompt=chat_llm_system_prompt, | |
tools=[], | |
model=LiteLLMModelWithBackOff(model_id=model_id, api_base=api_base, api_key=api_key, max_tokens=3000), | |
additional_authorized_imports=ADDITIONAL_AUTHORIZED_IMPORTS, | |
max_steps=max_steps, | |
verbosity_level=3, | |
) | |
agent.python_executor.static_tools.update({"open": read_only_open}) | |
return agent | |