import firebase_admin # type: ignore from firebase_admin import credentials, firestore # type: ignore from joblib import dump, load # type: ignore import datetime import re from sklearn.feature_extraction.text import TfidfVectorizer # type: ignore from sklearn.naive_bayes import MultinomialNB # type: ignore import pandas as pd # type: ignore # التهيئة مرة واحدة فقط if not firebase_admin._apps: # تأكد من وضع المسار الصحيح لملف التوثيق Firebase cred = credentials.Certificate("D:/app-sentinel-7qnr19-firebase-adminsdk-kjmbe-f38e16a432.json") firebase_admin.initialize_app(cred) db = firestore.client() # تحميل النموذج الحالي والمحول try: model = load('model.joblib') vectorizer = load('vectorizer.joblib') print("Model and vectorizer loaded successfully.") except Exception as e: model = None vectorizer = None print(f"Model and vectorizer not found. You need to train the model. Error: {e}") # 1. وظيفة لتحليل النصوص وتصنيفها def classify_and_store_message(message): global model, vectorizer try: if not model or not vectorizer: raise ValueError("Model or vectorizer not loaded. Train or load the model first.") # تحويل الرسالة إلى سمات رقمية message_vector = vectorizer.transform([message]) classification = model.predict(message_vector)[0] # إعداد البيانات للتخزين message_data = { 'text': message, 'classification': classification, 'timestamp': datetime.datetime.now() } # تخزين الرسالة في مجموعة Firestore حسب التصنيف collection_name = classification.split('_')[0] # استخدام الجزء الأول من التصنيف كاسم المجموعة db.collection(collection_name).add(message_data) # تخزين الرسالة في مجموعة 'all_messages' لجميع الرسائل db.collection('all_messages').add(message_data) # تخزين الرسالة في مجموعة 'recently_analyzed_messages' db.collection('recently_analyzed_messages').add(message_data) print(f"Message classified as {classification} and stored in Firestore.") return classification except Exception as e: print(f"Error classifying message: {e}") return None # 2. وظيفة لتحليل النصوص المدخلة def analyze_input_text(): print("\n--- SMS Classification and Link Analysis Tool ---") while True: user_input = input("Enter a message to classify (or type 'exit' to quit): ").strip() if user_input.lower() == 'exit': print("Exiting the tool. Goodbye!") break # استخراج الروابط من النص المدخل links = re.findall(r'(https?://[^\s]+)', user_input) if links: print(f"Detected links: {links}") # تحليل الروابط (يمكن تطوير التحليل ليشمل أدوات أو خدمات خارجية) for link in links: # افتراض تحليل بسيط (يمكن تحسينه لاحقًا) if "secure" in link or "safe" in link: print(f"Link '{link}' appears safe.") else: print(f"Link '{link}' might be suspicious.") else: print("No links detected in the message.") # تصنيف الرسالة classification = classify_and_store_message(user_input) if classification: print(f"Message classified as: {classification}") else: print("Unable to classify the message. Please try again.") # 3. دالة لتحديث النموذج مع بيانات جديدة def update_model_with_new_data(new_messages, new_labels): global model, vectorizer try: # تحميل البيانات الحالية data = { 'message': new_messages, 'label': new_labels } df_new = pd.DataFrame(data) # تحديث المحول والنموذج if vectorizer is None or model is None: vectorizer = TfidfVectorizer() X_new = vectorizer.fit_transform(df_new['message']) else: X_new = vectorizer.transform(df_new['message']) # جمع البيانات الجديدة مع القديمة وإعادة التدريب y_new = df_new['label'] if model is None: model = MultinomialNB() model.partial_fit(X_new, y_new, classes=['spam_phishing', 'social_phishing', 'news_phishing', 'advertisement_phishing']) # حفظ النموذج الجديد dump(model, 'model.joblib') dump(vectorizer, 'vectorizer.joblib') print("Model updated and saved successfully.") except Exception as e: print(f"Error updating model: {e}") # 4. دالة لاختبار النظام def test_system(): test_messages = [ "Win a free vacation now! Visit https://spam-link.com", "Breaking news: Major stock updates today.", "Don't forget our meeting tomorrow at 10 AM.", "Click here to secure your bank account: https://phishing-link.com", "Exclusive offers just for you! Buy now at https://ad-link.com" ] for msg in test_messages: print(f"\nAnalyzing message: {msg}") analyze_input_text(msg) # 5. وظيفة للتصحيح اليدوي def correct_classification(message_id, correct_label): try: # جلب الرسالة من Firestore message_ref = db.collection('all_messages').document(message_id) message_data = message_ref.get().to_dict() if not message_data: print("Message not found.") return # تحديث التصنيف في Firestore message_ref.update({'classification': correct_label}) # إضافة البيانات إلى نموذج التدريب الجديد update_model_with_new_data([message_data['text']], [correct_label]) print(f"Message classification corrected to {correct_label} and model updated.") except Exception as e: print(f"Error correcting classification: {e}") # تشغيل تحليل النصوص analyze_input_text()