Chris4K commited on
Commit
1e31715
·
verified ·
1 Parent(s): 14773a9

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +370 -0
app.py ADDED
@@ -0,0 +1,370 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # prompt: agent.run("Add the results of scraping: https://www.amazon.de/Amazon-Cola-24-330ml/dp/B0B2QFB69F/ref=sr_1_1_ffob_sspa?dib=eyJ2IjoiMSJ9.Ammc6GrHRevDKBSZX9_vNS5j3Kc1ZW2R4jISx9htSBAc0WWFC1xX5qnohoEQjmvNqQyWIr6hnMbFad3QuwPMVG8F_nZbwnpBcHL89OZsU2XzkSha-clTmgJLUUh7Z96_98HOe9hOif82mXyrL7ZTnbygPSbm-t6FDAfslLesKfij79QL7-a2RSOKVPcJRFR1DLUamaHfmhyN5c_rujFjb2X1rQSXg6NWCnOdgU2r1gzEa54bU8bxeQnX-vMsRMGEw4entZYP_Oh85pEImPU_lS2Awqr-sG_RgaV0Wuzfmdw.XA9kTWHZQvmhT2BoQWxRNix2TJe8EoeyjiSoQtFx1yY&dib_tag=se&keywords=Cola&qid=1738167189&rdc=1&sr=8-1-spons&sp_csd=d2lkZ2V0TmFtZT1zcF9hdGY&th=1 to a csv file")
2
+ #!pip install -q smolagents transformers sentence_transformers gradio
3
+ from smolagents import CodeAgent, HfApiModel, tool
4
+ import requests
5
+ from bs4 import BeautifulSoup
6
+ import pandas as pd
7
+ import gradio as gr
8
+ from typing import List, Dict, Optional
9
+ import time
10
+ import random
11
+ import re
12
+ from concurrent.futures import ThreadPoolExecutor
13
+ import json
14
+
15
+
16
+ import pandas as pd
17
+ ####
18
+ ####
19
+ ####
20
+ @tool
21
+ def amazon_scraper(keyword: str, mandatory_columns: List[str] = None, max_products: int = 10) -> Dict:
22
+ """
23
+ Enhanced Amazon scraper that gets both listing and detailed product information.
24
+
25
+ Args:
26
+ keyword: Search term for Amazon products
27
+ mandatory_columns: List of specific attributes to always include
28
+ max_products: Maximum number of products to scrape (default: 10)
29
+
30
+ Returns:
31
+ Dictionary containing list of products and their details, plus column names
32
+ """
33
+ if mandatory_columns is None:
34
+ mandatory_columns = ['title', 'price', 'rating', 'reviews']
35
+
36
+ headers = {
37
+ 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/VERSION Safari/537.36",
38
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
39
+ 'Accept-Language': 'en-US,en;q=0.5',
40
+ 'Connection': 'keep-alive',
41
+ }
42
+
43
+ search_url = f"https://www.amazon.com/s?k={keyword.replace(' ', '+')}"
44
+
45
+ try:
46
+ response = requests.get(search_url, headers=headers)
47
+ response.raise_for_status()
48
+ soup = BeautifulSoup(response.content, 'html.parser')
49
+
50
+ items = soup.find_all('div', attrs={'data-component-type': 's-search-result'})
51
+
52
+ if not items:
53
+ return {'error': 'No products found for the given search term'}
54
+
55
+ products = []
56
+ for item in items[:max_products]:
57
+ try:
58
+ product = {}
59
+
60
+ # Basic information from search results div data-cy="title-recipe"
61
+ # here I want to filter for :
62
+ title_elem = item.find('div', class_="title-instructions-style")
63
+ product['title'] = title_elem.text.strip() if title_elem else 'N/A'
64
+
65
+ price_elem = item.find('span', class_='a-offscreen')
66
+ product['price'] = price_elem.text.strip() if price_elem else 'N/A'
67
+
68
+ rating_elem = item.find('span', class_='a-icon-alt')
69
+ product['rating'] = rating_elem.text.split(' ')[0] if rating_elem else 'N/A'
70
+
71
+ reviews_elem = item.find('span', {'class': 'a-size-base', 'dir': 'auto'})
72
+ product['reviews'] = reviews_elem.text.strip() if reviews_elem else '0'
73
+
74
+ # Get product URL
75
+ url_elem = item.find('a', class_='a-link-normal s-no-outline')
76
+ if url_elem and 'href' in url_elem.attrs:
77
+ product_url = 'https://www.amazon.com' + url_elem['href']
78
+ product['url'] = product_url
79
+
80
+ # Scrape detailed information
81
+ details = scrape_product_details(product_url, headers)
82
+ product.update(details)
83
+
84
+ products.append(product)
85
+
86
+ except Exception as e:
87
+ print(f"Error processing item: {str(e)}")
88
+ continue
89
+
90
+ if not products:
91
+ return {'error': 'Failed to extract product information'}
92
+
93
+ return {'products': products, 'columns': list(products[0].keys())}
94
+
95
+ except requests.RequestException as e:
96
+ return {'error': f'Network error: {str(e)}'}
97
+ except Exception as e:
98
+ return {'error': f'Unexpected error: {str(e)}'}
99
+
100
+
101
+
102
+ from typing import Optional, Dict
103
+ import requests
104
+ from bs4 import BeautifulSoup
105
+ from smolagents import tool # Ensure you import @tool
106
+
107
+ @tool
108
+ def scrape_product_details(url: str, headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
109
+ """
110
+ Scrapes product details from an Amazon product page.
111
+
112
+ Args:
113
+ url: The URL of the Amazon product page to scrape.
114
+ headers: HTTP headers to include in the request. Defaults to None.
115
+
116
+ Returns:
117
+ Dict[str, str]: A dictionary containing:
118
+ - 'title': Product title
119
+ - 'price': Product price
120
+ - 'description': Product description
121
+ - 'bullet_points': Bullet point features (comma-separated string)
122
+ - 'average_rating': Customer rating
123
+ - 'total_reviews': Number of reviews
124
+ - 'image_link': URL of the main product image
125
+ """
126
+ if headers is None:
127
+ headers = {
128
+ 'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/VERSION Safari/537.36"
129
+ }
130
+
131
+ response = requests.get(url, headers=headers)
132
+
133
+ if response.status_code != 200:
134
+ return {'error': f'Failed to retrieve the page. Status code: {response.status_code}'}
135
+
136
+ soup = BeautifulSoup(response.content, 'html.parser')
137
+
138
+ # Extract the product title
139
+ product_title = soup.find('span', {'id': 'productTitle'})
140
+ product_title = product_title.get_text(strip=True) if product_title else 'Title not found'
141
+
142
+ # Extract the product price
143
+ product_price = soup.find('span', {'class': 'a-price-whole'})
144
+ product_price = product_price.get_text(strip=True) if product_price else 'Price not found'
145
+
146
+ # Extract the product description
147
+ product_description = soup.find('div', {'id': 'productDescription'})
148
+ product_description = product_description.get_text(strip=True) if product_description else 'Description not found'
149
+
150
+ # Extract bullet points
151
+ bullet_points = []
152
+ bullet_section = soup.find('ul', {'class': 'a-unordered-list a-vertical a-spacing-mini'})
153
+ if bullet_section:
154
+ for li in bullet_section.find_all('li'):
155
+ bullet = li.find('span', {'class': 'a-list-item'})
156
+ if bullet:
157
+ bullet_points.append(bullet.get_text(strip=True))
158
+ bullet_points_text = ', '.join(bullet_points) if bullet_points else 'Bullet points not found'
159
+
160
+ # Extract average customer rating
161
+ average_rating = soup.find('span', {'class': 'a-icon-alt'})
162
+ average_rating = average_rating.get_text(strip=True) if average_rating else 'Average rating not found'
163
+
164
+ # Extract total number of customer reviews
165
+ total_reviews = soup.find('span', {'id': 'acrCustomerReviewText'})
166
+ total_reviews = total_reviews.get_text(strip=True) if total_reviews else 'Total reviews not found'
167
+
168
+ # Extract the main image link
169
+ image_tag = soup.find('img', {'id': 'landingImage'})
170
+ image_link = image_tag['src'] if image_tag else 'Image link not found'
171
+
172
+ return {
173
+ 'title': product_title,
174
+ 'price': product_price,
175
+ 'description': product_description,
176
+ 'bullet_points': bullet_points_text,
177
+ 'average_rating': average_rating,
178
+ 'total_reviews': total_reviews,
179
+ 'image_link': image_link
180
+ }
181
+
182
+
183
+ ###
184
+ ###
185
+
186
+ model = HfApiModel()
187
+ agent = CodeAgent(
188
+ tools=[amazon_scraper],
189
+ model=model,
190
+ additional_authorized_imports=['requests', 'bs4', 'pandas', 'gradio', 'concurrent.futures', 'csv', 'json']
191
+ )
192
+
193
+ # Assuming the agent.run call returns a dictionary with 'products' key
194
+ #####
195
+ #####
196
+ ##
197
+ import gradio as gr
198
+ import pandas as pd
199
+ from typing import Dict, List, Tuple, Union
200
+ from smolagents import CodeAgent, HfApiModel, tool
201
+
202
+ def process_agent_response(response: Union[Dict, str]) -> Tuple[pd.DataFrame, str]:
203
+ """
204
+ Process the agent's response and convert it to a DataFrame
205
+ Returns DataFrame and error message (if any)
206
+ """
207
+ if isinstance(response, dict):
208
+ if 'error' in response:
209
+ return pd.DataFrame(), f"Error: {response['error']}"
210
+ elif 'products' in response:
211
+ try:
212
+ df = pd.DataFrame(response['products'])
213
+ return df, ""
214
+ except Exception as e:
215
+ return pd.DataFrame(), f"Error processing data: {str(e)}"
216
+ elif isinstance(response, str):
217
+ try:
218
+ # Try to evaluate string response as dict if needed
219
+ import ast
220
+ data = ast.literal_eval(response)
221
+ if isinstance(data, dict):
222
+ if 'products' in data:
223
+ df = pd.DataFrame(data['products'])
224
+ else:
225
+ df = pd.DataFrame([data])
226
+ return df, ""
227
+ except:
228
+ pass
229
+ return pd.DataFrame(), "Unexpected response format"
230
+
231
+ def search_products(keyword: str, max_products: int) -> Tuple[pd.DataFrame, str, str]:
232
+ """
233
+ Search for products and return results as a DataFrame
234
+ Returns: (DataFrame, status message, error message)
235
+ """
236
+ try:
237
+ result = agent.run(f'Search for {max_products} products with keyword: {keyword}')
238
+ df, error_msg = process_agent_response(result)
239
+
240
+ if not df.empty:
241
+ # Select and reorder relevant columns
242
+ display_columns = [
243
+ 'title', 'price', 'rating', 'reviews', 'description',
244
+ 'bullet_points', 'average_rating', 'total_reviews'
245
+ ]
246
+ display_columns = [col for col in display_columns if col in df.columns]
247
+ df = df[display_columns]
248
+
249
+ status_msg = f"Found {len(df)} products"
250
+ return df, status_msg, ""
251
+ else:
252
+ return df, "", error_msg or "No products found"
253
+
254
+ except Exception as e:
255
+ return pd.DataFrame(), "", f"Search error: {str(e)}"
256
+
257
+ def answer_product_question(df: pd.DataFrame, question: str) -> str:
258
+ """
259
+ Answer questions about the products using the agent
260
+ """
261
+ if df.empty:
262
+ return "Please search for products first before asking questions."
263
+
264
+ try:
265
+ # Convert DataFrame to a more readable format for the agent
266
+ products_context = df.to_dict('records')
267
+
268
+ prompt = f"""Based on these products:
269
+ {products_context}
270
+
271
+ Question: {question}
272
+
273
+ Please provide a clear and concise answer using only the information available in the product data."""
274
+
275
+ response = agent.run(prompt)
276
+
277
+ # Handle different response formats
278
+ if isinstance(response, dict):
279
+ return str(response)
280
+ return response
281
+
282
+ except Exception as e:
283
+ return f"Error processing question: {str(e)}"
284
+
285
+ def create_interface() -> gr.Interface:
286
+ """
287
+ Create the Gradio interface with search and Q&A functionality
288
+ """
289
+ with gr.Blocks(title="Amazon Product Search & Q&A") as interface:
290
+ gr.Markdown("# Amazon Product Search and Q&A System")
291
+
292
+ # Status message for feedback
293
+ status_msg = gr.Markdown("")
294
+
295
+ with gr.Row():
296
+ with gr.Column():
297
+ keyword_input = gr.Textbox(
298
+ label="Product Keyword or Name",
299
+ placeholder="Enter product keyword...",
300
+ scale=3
301
+ )
302
+ max_products = gr.Slider(
303
+ minimum=1,
304
+ maximum=10,
305
+ value=5,
306
+ step=1,
307
+ label="Number of Products",
308
+ )
309
+ search_button = gr.Button("Search Products", variant="primary")
310
+
311
+ with gr.Column():
312
+ question_input = gr.Textbox(
313
+ label="Ask about the products",
314
+ placeholder="Enter your question about the products...",
315
+ scale=3
316
+ )
317
+ ask_button = gr.Button("Ask Question", variant="secondary")
318
+
319
+ # Output components
320
+ with gr.Row():
321
+ with gr.Column(scale=2):
322
+ product_table = gr.Dataframe(
323
+ label="Product Search Results",
324
+ interactive=False,
325
+ wrap=True
326
+ )
327
+ with gr.Column(scale=1):
328
+ answer_output = gr.Markdown(
329
+ label="Answer to Your Question"
330
+ )
331
+
332
+ # Store DataFrame state
333
+ df_state = gr.State(pd.DataFrame())
334
+
335
+ def on_search(keyword: str, max_products: int) -> Tuple[pd.DataFrame, pd.DataFrame, str]:
336
+ # TODO add thinking in an output
337
+ df, status, error = search_products(keyword, max_products)
338
+ message = error if error else status
339
+ return df, df, gr.Markdown(message)
340
+
341
+ def on_question(df: pd.DataFrame, question: str) -> str:
342
+ # TODO add thinking in an output
343
+ return answer_product_question(df, question)
344
+
345
+ # Connect components
346
+ search_button.click(
347
+ fn=on_search,
348
+ inputs=[keyword_input, max_products],
349
+ outputs=[product_table, df_state, status_msg]
350
+ )
351
+
352
+ ask_button.click(
353
+ fn=on_question,
354
+ inputs=[df_state, question_input],
355
+ outputs=answer_output
356
+ )
357
+
358
+ return interface
359
+
360
+ def main():
361
+ # Create and launch the interface
362
+ interface = create_interface()
363
+ interface.launch(
364
+ debug=True,
365
+ server_name="0.0.0.0",
366
+ server_port=7860
367
+ )
368
+
369
+ if __name__ == "__main__":
370
+ main()