|
import os
|
|
import json
|
|
import re
|
|
import sys
|
|
import io
|
|
import contextlib
|
|
import warnings
|
|
from typing import Optional, List, Any, Tuple
|
|
from PIL import Image
|
|
import streamlit as st
|
|
import pandas as pd
|
|
import base64
|
|
from io import BytesIO
|
|
from together import Together
|
|
from e2b_code_interpreter import Sandbox
|
|
|
|
warnings.filterwarnings("ignore", category=UserWarning, module="pydantic")
|
|
|
|
pattern = re.compile(r"```python\n(.*?)\n```", re.DOTALL)
|
|
|
|
def code_interpret(e2b_code_interpreter: Sandbox, code: str) -> Optional[List[Any]]:
|
|
with st.spinner('Executing code in E2B sandbox...'):
|
|
stdout_capture = io.StringIO()
|
|
stderr_capture = io.StringIO()
|
|
|
|
with contextlib.redirect_stdout(stdout_capture), contextlib.redirect_stderr(stderr_capture):
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore")
|
|
exec = e2b_code_interpreter.run_code(code)
|
|
|
|
if stderr_capture.getvalue():
|
|
print("[Code Interpreter Warnings/Errors]", file=sys.stderr)
|
|
print(stderr_capture.getvalue(), file=sys.stderr)
|
|
|
|
if stdout_capture.getvalue():
|
|
print("[Code Interpreter Output]", file=sys.stdout)
|
|
print(stdout_capture.getvalue(), file=sys.stdout)
|
|
|
|
if exec.error:
|
|
print(f"[Code Interpreter ERROR] {exec.error}", file=sys.stderr)
|
|
return None
|
|
return exec.results
|
|
|
|
def match_code_blocks(llm_response: str) -> str:
|
|
match = pattern.search(llm_response)
|
|
if match:
|
|
code = match.group(1)
|
|
return code
|
|
return ""
|
|
|
|
def chat_with_llm(e2b_code_interpreter: Sandbox, user_message: str, dataset_path: str) -> Tuple[Optional[List[Any]], str]:
|
|
|
|
system_prompt = f"""You're a Python data scientist and data visualization expert. You are given a dataset at path '{dataset_path}' and also the user's query.
|
|
You need to analyze the dataset and answer the user's query with a response and you run Python code to solve them.
|
|
IMPORTANT: Always use the dataset path variable '{dataset_path}' in your code when reading the CSV file."""
|
|
|
|
messages = [
|
|
{"role": "system", "content": system_prompt},
|
|
{"role": "user", "content": user_message},
|
|
]
|
|
|
|
with st.spinner('Getting response from Together AI LLM model...'):
|
|
client = Together(api_key=st.session_state.together_api_key)
|
|
response = client.chat.completions.create(
|
|
model=st.session_state.model_name,
|
|
messages=messages,
|
|
)
|
|
|
|
response_message = response.choices[0].message
|
|
python_code = match_code_blocks(response_message.content)
|
|
|
|
if python_code:
|
|
code_interpreter_results = code_interpret(e2b_code_interpreter, python_code)
|
|
return code_interpreter_results, response_message.content
|
|
else:
|
|
st.warning(f"Failed to match any Python code in model's response")
|
|
return None, response_message.content
|
|
|
|
def upload_dataset(code_interpreter: Sandbox, uploaded_file) -> str:
|
|
dataset_path = f"./{uploaded_file.name}"
|
|
|
|
try:
|
|
code_interpreter.files.write(dataset_path, uploaded_file)
|
|
return dataset_path
|
|
except Exception as error:
|
|
st.error(f"Error during file upload: {error}")
|
|
raise error
|
|
|
|
|
|
def main():
|
|
"""Main Streamlit application."""
|
|
st.set_page_config(page_title="π AI Data Visualization Agent", page_icon="π", layout="wide")
|
|
|
|
st.title("π AI Data Visualization Agent")
|
|
st.write("Upload your dataset and ask questions about it!")
|
|
|
|
|
|
if 'together_api_key' not in st.session_state:
|
|
st.session_state.together_api_key = ''
|
|
if 'e2b_api_key' not in st.session_state:
|
|
st.session_state.e2b_api_key = ''
|
|
if 'model_name' not in st.session_state:
|
|
st.session_state.model_name = ''
|
|
|
|
|
|
with st.sidebar:
|
|
st.header("π API Keys and Model Configuration")
|
|
st.session_state.together_api_key = st.text_input("Together AI API Key", type="password")
|
|
st.info("π‘ Everyone gets a free $1 credit by Together AI - AI Acceleration Cloud platform")
|
|
st.markdown("[Get Together AI API Key](https://api.together.ai/signin)")
|
|
|
|
st.session_state.e2b_api_key = st.text_input("Enter E2B API Key", type="password")
|
|
st.markdown("[Get E2B API Key](https://e2b.dev/docs/legacy/getting-started/api-key)")
|
|
|
|
|
|
model_options = {
|
|
"Meta-Llama 3.1 405B": "meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo",
|
|
"DeepSeek V3": "deepseek-ai/DeepSeek-V3",
|
|
"Qwen 2.5 7B": "Qwen/Qwen2.5-7B-Instruct-Turbo",
|
|
"Meta-Llama 3.3 70B": "meta-llama/Llama-3.3-70B-Instruct-Turbo"
|
|
}
|
|
st.session_state.model_name = st.selectbox(
|
|
"Select Model",
|
|
options=list(model_options.keys()),
|
|
index=0
|
|
)
|
|
st.session_state.model_name = model_options[st.session_state.model_name]
|
|
|
|
|
|
col1, col2 = st.columns([1, 2])
|
|
|
|
with col1:
|
|
st.header("π Upload Dataset")
|
|
uploaded_file = st.file_uploader("Choose a CSV file", type="csv", key="file_uploader")
|
|
|
|
if uploaded_file is not None:
|
|
|
|
df = pd.read_csv(uploaded_file)
|
|
st.write("### Dataset Preview")
|
|
show_full = st.checkbox("Show full dataset")
|
|
if show_full:
|
|
st.dataframe(df)
|
|
else:
|
|
st.write("Preview (first 5 rows):")
|
|
st.dataframe(df.head())
|
|
|
|
with col2:
|
|
if uploaded_file is not None:
|
|
st.header("β Ask a Question")
|
|
query = st.text_area(
|
|
"What would you like to know about your data?",
|
|
"Can you compare the average cost for two people between different categories?",
|
|
height=100
|
|
)
|
|
|
|
if st.button("Analyze", type="primary", key="analyze_button"):
|
|
if not st.session_state.together_api_key or not st.session_state.e2b_api_key:
|
|
st.error("Please enter both API keys in the sidebar.")
|
|
else:
|
|
with Sandbox(api_key=st.session_state.e2b_api_key) as code_interpreter:
|
|
|
|
dataset_path = upload_dataset(code_interpreter, uploaded_file)
|
|
|
|
|
|
code_results, llm_response = chat_with_llm(code_interpreter, query, dataset_path)
|
|
|
|
|
|
st.header("π€ AI Response")
|
|
st.write(llm_response)
|
|
|
|
|
|
if code_results:
|
|
st.header("π Analysis Results")
|
|
for result in code_results:
|
|
if hasattr(result, 'png') and result.png:
|
|
|
|
png_data = base64.b64decode(result.png)
|
|
|
|
|
|
image = Image.open(BytesIO(png_data))
|
|
st.image(image, caption="Generated Visualization", use_container_width=True)
|
|
elif hasattr(result, 'figure'):
|
|
fig = result.figure
|
|
st.pyplot(fig)
|
|
elif hasattr(result, 'show'):
|
|
st.plotly_chart(result)
|
|
elif isinstance(result, (pd.DataFrame, pd.Series)):
|
|
st.dataframe(result)
|
|
else:
|
|
st.write(result)
|
|
|
|
if __name__ == "__main__":
|
|
main() |