import subprocess # π₯²
import os
import time
import torch
import numpy as np
import gradio as gr
import spaces
import re
import json
from datetime import datetime
from transformers import AutoModelForCausalLM, AutoTokenizer
from duckduckgo_search import DDGS
from pydantic import BaseModel
# ----------------------- Setup & Dependency Installation ----------------------- #
try:
subprocess.run(['git', 'lfs', 'install'], check=True)
if not os.path.exists('Kokoro-82M'):
subprocess.run(['git', 'clone', 'https://huggingface.co./hexgrad/Kokoro-82M'], check=True)
try:
subprocess.run(['apt-get', 'update'], check=True)
subprocess.run(['apt-get', 'install', '-y', 'espeak'], check=True)
except subprocess.CalledProcessError:
print("Warning: Could not install espeak. Trying espeak-ng...")
try:
subprocess.run(['apt-get', 'install', '-y', 'espeak-ng'], check=True)
except subprocess.CalledProcessError:
print("Warning: Could not install espeak or espeak-ng. TTS functionality may be limited.")
except Exception as e:
print(f"Warning: Initial setup error: {str(e)}")
print("Continuing with limited functionality...")
# ----------------------- Global Variables ----------------------- #
# μμ± κ΄λ ¨ λ³μλ λ μ΄μ μ¬μ©νμ§ μμ
# VOICE_CHOICES = { ... } --> μ κ±°
# ----------------------- Model and Tokenizer Initialization ----------------------- #
model_name = "deepseek-ai/DeepSeek-R1-Distill-Llama-8B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
def init_models():
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
offload_folder="offload",
low_cpu_mem_usage=True,
torch_dtype=torch.float16
)
return model
# ----------------------- Kokoro TTS Initialization ----------------------- #
# μμ± κΈ°λ₯ μ κ±°: TTS μ΄κΈ°ν κ΄λ ¨ μ½λλ λ μ΄μ μ¬μ©νμ§ μμ
TTS_ENABLED = False
# ----------------------- Web Search Functions ----------------------- #
def get_web_results(query, max_results=5):
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
return [{
"title": result.get("title", ""),
"snippet": result["body"],
"url": result["href"],
"date": result.get("published", "")
} for result in results]
except Exception as e:
return []
def format_prompt(query, context):
"""μΉ κ²μ κ²°κ³Όλ₯Ό λ°νμΌλ‘ κ°κ²°νκ³ μμ½λ λ΅λ³μ μμ±νλλ‘ ν둬ννΈλ₯Ό ꡬμ±ν©λλ€."""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
context_lines = '\n'.join([f'- {res["title"]}: {res["snippet"]}' for res in context])
return f"""You are an intelligent search assistant. Your task is to provide a concise, clear summary answer to the user's query based solely on the provided web context.
Current Time: {current_time}
Query: {query}
Web Context:
{context_lines}
Please provide a summary answer in markdown format, including citations such as [1], [2], etc. in your answer if needed.
Answer:"""
def format_sources(web_results):
if not web_results:
return "
No sources available
"
sources_html = ""
for i, res in enumerate(web_results, 1):
title = res["title"] or "Source"
date = f"
{res['date']}" if res['date'] else ""
sources_html += f"""
[{i}]
{title}
{date}
{res['snippet'][:150]}...
"""
sources_html += "
"
return sources_html
# ----------------------- Answer Generation ----------------------- #
@spaces.GPU(duration=30)
def generate_answer(prompt):
model = init_models()
inputs = tokenizer(
prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512,
return_attention_mask=True
).to(model.device)
outputs = model.generate(
inputs.input_ids,
attention_mask=inputs.attention_mask,
max_new_tokens=256,
temperature=0.7,
top_p=0.95,
pad_token_id=tokenizer.eos_token_id,
do_sample=True,
early_stopping=True
)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
# ----------------------- Process Query and Output Summary ----------------------- #
def process_query(query, history):
try:
if history is None:
history = []
# μΉ κ²μ κ²°κ³Ό κ°μ Έμ€κΈ°
web_results = get_web_results(query)
sources_html = format_sources(web_results)
# μ€κ° μν νμ
current_history = history + [[query, "*Searching...*"]]
yield {
answer_output: gr.Markdown("*Searching & Summarizing...*"),
sources_output: gr.HTML(sources_html),
search_btn: gr.Button("Searching...", interactive=False),
chat_history_display: current_history
}
# ν둬ννΈ μμ±: μΉ κ²°κ³Όλ₯Ό μμ½νλ ννλ‘ κ΅¬μ±
prompt_text = format_prompt(query, web_results)
answer = generate_answer(prompt_text)
final_answer = answer.split("Answer:")[-1].strip()
updated_history = history + [[query, final_answer]]
yield {
answer_output: gr.Markdown(final_answer),
sources_output: gr.HTML(sources_html),
search_btn: gr.Button("Search", interactive=True),
chat_history_display: updated_history
}
except Exception as e:
error_message = str(e)
if "GPU quota" in error_message:
error_message = "β οΈ GPU quota exceeded. Please try again later when the daily quota resets."
yield {
answer_output: gr.Markdown(f"Error: {error_message}"),
sources_output: gr.HTML(""),
search_btn: gr.Button("Search", interactive=True),
chat_history_display: history + [[query, f"*Error: {error_message}*"]]
}
# ----------------------- Custom CSS for Bright UI ----------------------- #
css = """
.gradio-container {
max-width: 1200px !important;
background-color: #ffffff !important;
padding: 20px;
border-radius: 12px;
}
#header {
text-align: center;
padding: 2rem 0;
background: #e3f2fd;
border-radius: 12px;
color: #333333;
margin-bottom: 2rem;
}
#header h1 {
font-size: 2.5rem;
margin-bottom: 0.5rem;
}
.search-container {
background: #f8f9fa;
border-radius: 12px;
padding: 1.5rem;
margin-bottom: 1rem;
border: 1px solid #e0e0e0;
}
.search-box {
padding: 1rem;
background: #ffffff;
border-radius: 8px;
margin-bottom: 1rem;
border: 1px solid #e0e0e0;
}
.search-box input[type="text"] {
background: #ffffff !important;
border: 1px solid #cccccc !important;
color: #333333 !important;
border-radius: 8px !important;
}
.search-box input[type="text"]::placeholder {
color: #888888 !important;
}
.search-box button {
background: #007bff !important;
border: none !important;
}
.results-container {
background: #ffffff;
border-radius: 8px;
padding: 1.5rem;
margin-top: 1rem;
border: 1px solid #e0e0e0;
}
.answer-box {
background: #f1f1f1;
border-radius: 8px;
padding: 1.5rem;
color: #333333;
margin-bottom: 1rem;
}
.answer-box p {
color: #555555;
line-height: 1.6;
}
.sources-container {
margin-top: 1rem;
background: #ffffff;
border-radius: 8px;
padding: 1rem;
border: 1px solid #e0e0e0;
}
.source-item {
display: flex;
padding: 12px;
margin: 8px 0;
background: #f8f9fa;
border-radius: 8px;
transition: all 0.2s;
}
.source-item:hover {
background: #e9ecef;
}
.source-number {
font-weight: bold;
margin-right: 12px;
color: #007bff;
}
.source-content {
flex: 1;
}
.source-title {
color: #007bff;
font-weight: 500;
text-decoration: none;
display: block;
margin-bottom: 4px;
}
.source-date {
color: #888888;
font-size: 0.9em;
margin-left: 8px;
}
.source-snippet {
color: #555555;
font-size: 0.9em;
line-height: 1.4;
}
.chat-history {
max-height: 400px;
overflow-y: auto;
padding: 1rem;
background: #ffffff;
border-radius: 8px;
margin-top: 1rem;
border: 1px solid #e0e0e0;
}
footer {
text-align: center;
padding: 1rem 0;
font-size: 0.9em;
color: #666666;
}
"""
# ----------------------- Gradio Interface ----------------------- #
with gr.Blocks(title="AI Search Assistant", css=css) as demo:
chat_history = gr.State([])
with gr.Column(elem_id="header"):
gr.Markdown("# π AI Search Assistant")
gr.Markdown("### Powered by DeepSeek & Real-time Web Results")
with gr.Column(elem_classes="search-container"):
with gr.Row(elem_classes="search-box"):
search_input = gr.Textbox(
label="",
placeholder="Ask anything...",
scale=5,
container=False
)
search_btn = gr.Button("Search", variant="primary", scale=1)
with gr.Row(elem_classes="results-container"):
with gr.Column(scale=2):
with gr.Column(elem_classes="answer-box"):
answer_output = gr.Markdown()
with gr.Accordion("Chat History", open=False):
chat_history_display = gr.Chatbot(elem_classes="chat-history")
with gr.Column(scale=1):
with gr.Column():
gr.Markdown("### Sources")
sources_output = gr.HTML()
with gr.Row():
gr.Examples(
examples=[
"musk explores blockchain for doge",
"nvidia to launch new gaming card",
"What are the best practices for sustainable living?",
"How is climate change affecting ocean ecosystems?"
],
inputs=search_input,
label="Try these examples"
)
search_btn.click(
fn=process_query,
inputs=[search_input, chat_history],
outputs=[answer_output, sources_output, search_btn, chat_history_display]
)
search_input.submit(
fn=process_query,
inputs=[search_input, chat_history],
outputs=[answer_output, sources_output, search_btn, chat_history_display]
)
if __name__ == "__main__":
demo.launch(share=True)