import firebase_admin # type: ignore
from firebase_admin import credentials, firestore # type: ignore
from joblib import dump, load # type: ignore
import datetime
import re
from sklearn.feature_extraction.text import TfidfVectorizer # type: ignore
from sklearn.naive_bayes import MultinomialNB # type: ignore
import pandas as pd # type: ignore
# التهيئة مرة واحدة فقط
if not firebase_admin._apps:
# تأكد من وضع المسار الصحيح لملف التوثيق Firebase
cred = credentials.Certificate("D:/app-sentinel-7qnr19-firebase-adminsdk-kjmbe-f38e16a432.json")
db = firestore.client()
# تحميل النموذج الحالي والمحول
model = load('model.joblib')
vectorizer = load('vectorizer.joblib')
print("Model and vectorizer loaded successfully.")
except Exception as e:
model = None
vectorizer = None
print(f"Model and vectorizer not found. You need to train the model. Error: {e}")
# 1. وظيفة لتحليل النصوص وتصنيفها
def classify_and_store_message(message):
global model, vectorizer
if not model or not vectorizer:
raise ValueError("Model or vectorizer not loaded. Train or load the model first.")
# تحويل الرسالة إلى سمات رقمية
message_vector = vectorizer.transform([message])
classification = model.predict(message_vector)[0]
# إعداد البيانات للتخزين
message_data = {
'text': message,
'classification': classification,
# تخزين الرسالة في مجموعة Firestore حسب التصنيف
collection_name = classification.split('_')[0] # استخدام الجزء الأول من التصنيف كاسم المجموعة
# تخزين الرسالة في مجموعة 'all_messages' لجميع الرسائل
# تخزين الرسالة في مجموعة 'recently_analyzed_messages'
print(f"Message classified as {classification} and stored in Firestore.")
return classification
except Exception as e:
print(f"Error classifying message: {e}")
return None
# 2. وظيفة لتحليل النصوص المدخلة
def analyze_input_text():
print("\n--- SMS Classification and Link Analysis Tool ---")
while True:
user_input = input("Enter a message to classify (or type 'exit' to quit): ").strip()
if user_input.lower() == 'exit':
print("Exiting the tool. Goodbye!")
# استخراج الروابط من النص المدخل
links = re.findall(r'(https?://[^\s]+)', user_input)
if links:
print(f"Detected links: {links}")
# تحليل الروابط (يمكن تطوير التحليل ليشمل أدوات أو خدمات خارجية)
for link in links:
# افتراض تحليل بسيط (يمكن تحسينه لاحقًا)
if "secure" in link or "safe" in link:
print(f"Link '{link}' appears safe.")
print(f"Link '{link}' might be suspicious.")
print("No links detected in the message.")
# تصنيف الرسالة
classification = classify_and_store_message(user_input)
if classification:
print(f"Message classified as: {classification}")
print("Unable to classify the message. Please try again.")
# 3. دالة لتحديث النموذج مع بيانات جديدة
def update_model_with_new_data(new_messages, new_labels):
global model, vectorizer
# تحميل البيانات الحالية
data = {
'message': new_messages,
'label': new_labels
df_new = pd.DataFrame(data)
# تحديث المحول والنموذج
if vectorizer is None or model is None:
vectorizer = TfidfVectorizer()
X_new = vectorizer.fit_transform(df_new['message'])
X_new = vectorizer.transform(df_new['message'])
# جمع البيانات الجديدة مع القديمة وإعادة التدريب
y_new = df_new['label']
if model is None:
model = MultinomialNB()
model.partial_fit(X_new, y_new, classes=['spam_phishing', 'social_phishing', 'news_phishing', 'advertisement_phishing'])
# حفظ النموذج الجديد
dump(model, 'model.joblib')
dump(vectorizer, 'vectorizer.joblib')
print("Model updated and saved successfully.")
except Exception as e:
print(f"Error updating model: {e}")
# 4. دالة لاختبار النظام
def test_system():
test_messages = [
"Win a free vacation now! Visit",
"Breaking news: Major stock updates today.",
"Don't forget our meeting tomorrow at 10 AM.",
"Click here to secure your bank account:",
"Exclusive offers just for you! Buy now at"
for msg in test_messages:
print(f"\nAnalyzing message: {msg}")
# 5. وظيفة للتصحيح اليدوي
def correct_classification(message_id, correct_label):
# جلب الرسالة من Firestore
message_ref = db.collection('all_messages').document(message_id)
message_data = message_ref.get().to_dict()
if not message_data:
print("Message not found.")
# تحديث التصنيف في Firestore
message_ref.update({'classification': correct_label})
# إضافة البيانات إلى نموذج التدريب الجديد
update_model_with_new_data([message_data['text']], [correct_label])
print(f"Message classification corrected to {correct_label} and model updated.")
except Exception as e:
print(f"Error correcting classification: {e}")
# تشغيل تحليل النصوص