|
import asyncio |
|
import gradio as gr |
|
from autogen.runtime_logging import start, stop |
|
from autogen_agentchat.agents import AssistantAgent |
|
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination |
|
from autogen_agentchat.teams import RoundRobinGroupChat |
|
from autogen_ext.models.openai import OpenAIChatCompletionClient |
|
from autogen_agentchat.base import TaskResult |
|
|
|
|
|
LOG_FILE = "team_runtime.log" |
|
|
|
def create_llm_config(api_key): |
|
return { |
|
"model": "gpt-4o", |
|
"api_key": api_key, |
|
"cache_seed": None |
|
} |
|
|
|
|
|
def create_team(llm_config, primary_system_message, critic_system_message): |
|
model_client = OpenAIChatCompletionClient(**llm_config) |
|
|
|
primary_agent = AssistantAgent( |
|
"primary", |
|
model_client=model_client, |
|
system_message=primary_system_message, |
|
) |
|
|
|
critic_agent = AssistantAgent( |
|
"critic", |
|
model_client=model_client, |
|
system_message=critic_system_message |
|
) |
|
|
|
|
|
max_message_termination = MaxMessageTermination(max_messages=10) |
|
text_termination = TextMentionTermination("APPROVE") |
|
combined_termination = max_message_termination | text_termination |
|
|
|
team = RoundRobinGroupChat([primary_agent, critic_agent], termination_condition=combined_termination) |
|
return team, model_client |
|
|
|
|
|
async def async_stream_task(task_message, api_key, primary_system_message, critic_system_message, documentation_system_message): |
|
|
|
logging_session_id = start(logger_type="file", config={"filename": LOG_FILE}) |
|
print(f"Logging session ID: {logging_session_id}") |
|
|
|
llm_config = create_llm_config(api_key) |
|
team, model_client = create_team(llm_config, primary_system_message, critic_system_message) |
|
documentation_triggered = False |
|
final_output = None |
|
|
|
try: |
|
async for message in team.run_stream(task=task_message): |
|
if hasattr(message, "source") and hasattr(message, "content"): |
|
|
|
if message.source == "critic" and "APPROVE" in message.content: |
|
print("Critic approved the response. Handing off to Documentation Agent...") |
|
documentation_triggered = True |
|
final_output = task_message |
|
break |
|
yield message.source, message.content |
|
|
|
|
|
if documentation_triggered and final_output: |
|
documentation_agent = AssistantAgent( |
|
"documentation", |
|
model_client=model_client, |
|
system_message=documentation_system_message, |
|
) |
|
doc_task = f"Generate a '--help' message for the following code:\n\n{final_output}" |
|
async for doc_message in documentation_agent.run_stream(task=doc_task): |
|
if isinstance(doc_message, TaskResult): |
|
|
|
for msg in doc_message.messages: |
|
yield msg.source, msg.content |
|
else: |
|
yield doc_message.source, doc_message.content |
|
|
|
finally: |
|
|
|
stop() |
|
|
|
|
|
async def chat_interface(api_key, primary_system_message, critic_system_message, documentation_system_message, task_message): |
|
primary_messages = [] |
|
critic_messages = [] |
|
documentation_messages = [] |
|
|
|
|
|
async for source, output in async_stream_task(task_message, api_key, primary_system_message, critic_system_message, documentation_system_message): |
|
if source == "primary": |
|
primary_messages.append(output) |
|
elif source == "critic": |
|
critic_messages.append(output) |
|
elif source == "documentation": |
|
documentation_messages.append(output) |
|
|
|
|
|
yield ( |
|
"\n".join(primary_messages), |
|
"\n".join(critic_messages), |
|
"\n".join(documentation_messages), |
|
) |
|
|
|
|
|
iface = gr.Interface( |
|
fn=chat_interface, |
|
inputs=[ |
|
gr.Textbox(label="OpenAI API Key", type="password", placeholder="Enter your OpenAI API Key"), |
|
gr.Textbox(label="Primary Agent System Message", placeholder="Enter the system message for the primary agent", value="You are a creative assistant focused on producing high-quality code."), |
|
gr.Textbox(label="Critic Agent System Message", placeholder="Enter the system message for the critic agent (requires APPROVAL tag!)", value="You are a critic assistant highly skilled in evaluating the quality of a given code or response. Provide constructive feedback and respond with 'APPROVE' once the feedback is addressed."), |
|
gr.Textbox(label="Documentation Agent System Message", placeholder="Enter the system message for the documentation agent", value="You are a documentation assistant. Write a short and concise '--help' message for the provided code."), |
|
gr.Textbox(label="Task Message", placeholder="Code a random password generator using python."), |
|
], |
|
outputs=[ |
|
gr.Textbox(label="The Primary Assistant Messages"), |
|
gr.Textbox(label="The Critics Assistant Messages"), |
|
gr.Textbox(label="The Documentation Assistant Message"), |
|
], |
|
title="Team Workflow with Documentation Agent and Hard Cap", |
|
description="""Collaborative workflow between Primary, Critic, and Documentation agents. |
|
1. The user can send a prompt to the primary agent. |
|
2. The response will then be evaluated by the critic, which either sends feedback back to the primary agent or gives the APPROVAL sign. |
|
3. If the APPROVAL sign is given, the documentation agent is asked to write a short documentation for the code (that has been approved by the critic and generated by the priamry agent. |
|
4. (Note: There is a hard cap of 10 messages for the critic to approve the output of the primary agent. If it fails to do so the workflow is interrupted to prevent long loops)""" |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
iface.launch(share=True) |
|
|