File size: 6,502 Bytes
c3cdef4 8e9f822 c3cdef4 eef1d6e c3cdef4 fdd250f c3cdef4 eef1d6e c3cdef4 eef1d6e c3cdef4 4d56490 fdd250f 560fc2f 8e9f822 c3cdef4 2c46896 c3cdef4 a685652 2c46896 8e9f822 c3cdef4 8e9f822 c3cdef4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import asyncio
import gradio as gr
from autogen.runtime_logging import start, stop
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.base import TaskResult
# Configuration
LOG_FILE = "team_runtime.log"
def create_llm_config(api_key):
return {
"model": "gpt-4o",
"api_key": api_key,
"cache_seed": None
}
# Create the team with primary and critic agents
def create_team(llm_config, primary_system_message, critic_system_message):
model_client = OpenAIChatCompletionClient(**llm_config)
primary_agent = AssistantAgent(
"primary",
model_client=model_client,
system_message=primary_system_message,
)
critic_agent = AssistantAgent(
"critic",
model_client=model_client,
system_message=critic_system_message
)
# Set termination conditions (10-message cap OR "APPROVE" detected)
max_message_termination = MaxMessageTermination(max_messages=10)
text_termination = TextMentionTermination("APPROVE")
combined_termination = max_message_termination | text_termination
team = RoundRobinGroupChat([primary_agent, critic_agent], termination_condition=combined_termination)
return team, model_client
# Function to stream the task through the workflow
async def async_stream_task(task_message, api_key, primary_system_message, critic_system_message, documentation_system_message):
# Start logging
logging_session_id = start(logger_type="file", config={"filename": LOG_FILE})
print(f"Logging session ID: {logging_session_id}")
llm_config = create_llm_config(api_key)
team, model_client = create_team(llm_config, primary_system_message, critic_system_message)
documentation_triggered = False # Track if documentation agent was triggered
final_output = None # Store the final approved output
try:
async for message in team.run_stream(task=task_message):
if hasattr(message, "source") and hasattr(message, "content"):
# Handle critic's approval
if message.source == "critic" and "APPROVE" in message.content:
print("Critic approved the response. Handing off to Documentation Agent...")
documentation_triggered = True
final_output = task_message # Capture the final approved output
break
yield message.source, message.content
# Trigger Documentation Agent if approved
if documentation_triggered and final_output:
documentation_agent = AssistantAgent(
"documentation",
model_client=model_client,
system_message=documentation_system_message,
)
doc_task = f"Generate a '--help' message for the following code:\n\n{final_output}"
async for doc_message in documentation_agent.run_stream(task=doc_task):
if isinstance(doc_message, TaskResult):
# Extract messages from TaskResult
for msg in doc_message.messages:
yield msg.source, msg.content
else:
yield doc_message.source, doc_message.content
finally:
# Stop logging
stop()
# Gradio interface function
async def chat_interface(api_key, primary_system_message, critic_system_message, documentation_system_message, task_message):
primary_messages = []
critic_messages = []
documentation_messages = []
# Append new messages while streaming
async for source, output in async_stream_task(task_message, api_key, primary_system_message, critic_system_message, documentation_system_message):
if source == "primary":
primary_messages.append(output)
elif source == "critic":
critic_messages.append(output)
elif source == "documentation":
documentation_messages.append(output)
# Return all outputs
yield (
"\n".join(primary_messages),
"\n".join(critic_messages),
"\n".join(documentation_messages),
)
# Gradio interface
iface = gr.Interface(
fn=chat_interface,
inputs=[
gr.Textbox(label="OpenAI API Key", type="password", placeholder="Enter your OpenAI API Key"),
gr.Textbox(label="Primary Agent System Message", placeholder="Enter the system message for the primary agent", value="You are a creative assistant focused on producing high-quality code."),
gr.Textbox(label="Critic Agent System Message", placeholder="Enter the system message for the critic agent (requires APPROVAL tag!)", value="You are a critic assistant highly skilled in evaluating the quality of a given code or response. Provide constructive feedback and respond with 'APPROVE' once the feedback is addressed. Never produce any code or other output yourself, only provide feedback!"),
gr.Textbox(label="Documentation Agent System Message", placeholder="Enter the system message for the documentation agent", value="You are a documentation assistant. Write a short and concise '--help' message for the provided code."),
gr.Textbox(label="Task Message", placeholder="Code a random password generator using python."),
],
outputs=[
gr.Textbox(label="The Primary Assistant Messages"),
gr.Textbox(label="The Critics Assistant Messages"),
gr.Textbox(label="The Documentation Assistant Message"),
],
title="Team Workflow with Documentation Agent and Hard Cap",
description="""Collaborative workflow between Primary, Critic, and Documentation agents.
1. The user can send a prompt to the primary agent.
2. The response will then be evaluated by the critic, which either sends feedback back to the primary agent or gives the APPROVAL sign.
3. If the APPROVAL sign is given, the documentation agent is asked to write a short documentation for the code (that has been approved by the critic and generated by the priamry agent.
4. (Note: There is a hard cap of 10 messages for the critic to approve the output of the primary agent. If it fails to do so the workflow is interrupted to prevent long loops)"""
)
# Launch the app
if __name__ == "__main__":
iface.launch(share=True)
|