import asyncio import gradio as gr from autogen.runtime_logging import start, stop from autogen_agentchat.agents import AssistantAgent from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination from autogen_agentchat.teams import RoundRobinGroupChat from autogen_ext.models.openai import OpenAIChatCompletionClient from autogen_agentchat.base import TaskResult # Configuration LOG_FILE = "team_runtime.log" def create_llm_config(api_key): return { "model": "gpt-4o", "api_key": api_key, "cache_seed": None } # Create the team with primary and critic agents def create_team(llm_config, primary_system_message, critic_system_message): model_client = OpenAIChatCompletionClient(**llm_config) primary_agent = AssistantAgent( "primary", model_client=model_client, system_message=primary_system_message, ) critic_agent = AssistantAgent( "critic", model_client=model_client, system_message=critic_system_message ) # Set termination conditions (10-message cap OR "APPROVE" detected) max_message_termination = MaxMessageTermination(max_messages=10) text_termination = TextMentionTermination("APPROVE") combined_termination = max_message_termination | text_termination team = RoundRobinGroupChat([primary_agent, critic_agent], termination_condition=combined_termination) return team, model_client # Function to stream the task through the workflow async def async_stream_task(task_message, api_key, primary_system_message, critic_system_message, documentation_system_message): # Start logging logging_session_id = start(logger_type="file", config={"filename": LOG_FILE}) print(f"Logging session ID: {logging_session_id}") llm_config = create_llm_config(api_key) team, model_client = create_team(llm_config, primary_system_message, critic_system_message) documentation_triggered = False # Track if documentation agent was triggered final_output = None # Store the final approved output try: async for message in team.run_stream(task=task_message): if hasattr(message, "source") and hasattr(message, "content"): # Handle critic's approval if message.source == "critic" and "APPROVE" in message.content: print("Critic approved the response. Handing off to Documentation Agent...") documentation_triggered = True final_output = task_message # Capture the final approved output break yield message.source, message.content # Trigger Documentation Agent if approved if documentation_triggered and final_output: documentation_agent = AssistantAgent( "documentation", model_client=model_client, system_message=documentation_system_message, ) doc_task = f"Generate a '--help' message for the following code:\n\n{final_output}" async for doc_message in documentation_agent.run_stream(task=doc_task): if isinstance(doc_message, TaskResult): # Extract messages from TaskResult for msg in doc_message.messages: yield msg.source, msg.content else: yield doc_message.source, doc_message.content finally: # Stop logging stop() # Gradio interface function async def chat_interface(api_key, primary_system_message, critic_system_message, documentation_system_message, task_message): primary_messages = [] critic_messages = [] documentation_messages = [] # Append new messages while streaming async for source, output in async_stream_task(task_message, api_key, primary_system_message, critic_system_message, documentation_system_message): if source == "primary": primary_messages.append(output) elif source == "critic": critic_messages.append(output) elif source == "documentation": documentation_messages.append(output) # Return all outputs yield ( "\n".join(primary_messages), "\n".join(critic_messages), "\n".join(documentation_messages), ) # Gradio interface iface = gr.Interface( fn=chat_interface, inputs=[ gr.Textbox(label="OpenAI API Key", type="password", placeholder="Enter your OpenAI API Key"), gr.Textbox(label="Primary Agent System Message", placeholder="Enter the system message for the primary agent", value="You are a creative assistant focused on producing high-quality code."), gr.Textbox(label="Critic Agent System Message", placeholder="Enter the system message for the critic agent (requires APPROVAL tag!)", value="You are a critic assistant highly skilled in evaluating the quality of a given code or response. Provide constructive feedback and respond with 'APPROVE' once the feedback is addressed. Never produce any code or other output yourself, only provide feedback!"), gr.Textbox(label="Documentation Agent System Message", placeholder="Enter the system message for the documentation agent", value="You are a documentation assistant. Write a short and concise '--help' message for the provided code."), gr.Textbox(label="Task Message", placeholder="Code a random password generator using python."), ], outputs=[ gr.Textbox(label="The Primary Assistant Messages"), gr.Textbox(label="The Critics Assistant Messages"), gr.Textbox(label="The Documentation Assistant Message"), ], title="Team Workflow with Documentation Agent and Hard Cap", description="""Collaborative workflow between Primary, Critic, and Documentation agents. 1. The user can send a prompt to the primary agent. 2. The response will then be evaluated by the critic, which either sends feedback back to the primary agent or gives the APPROVAL sign. 3. If the APPROVAL sign is given, the documentation agent is asked to write a short documentation for the code (that has been approved by the critic and generated by the priamry agent. 4. (Note: There is a hard cap of 10 messages for the critic to approve the output of the primary agent. If it fails to do so the workflow is interrupted to prevent long loops)""" ) # Launch the app if __name__ == "__main__": iface.launch(share=True)