from flask import Flask, request, render_template, redirect, url_for, flash, jsonify, abort from flask_sqlalchemy import SQLAlchemy from datetime import datetime, timedelta, timezone from apscheduler.schedulers.background import BackgroundScheduler import markdown app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///forum.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = 'change_this_secret_key' # À modifier pour la production db = SQLAlchemy(app) # ------------------------------- # Ajout d'un filtre escapejs pour Jinja2 # ------------------------------- def escapejs_filter(s): if s is None: return "" return (s.replace('\\', '\\\\') .replace("'", "\\'") .replace('"', '\\"') .replace('\n', '\\n') .replace('\r', '\\r')) app.jinja_env.filters['escapejs'] = escapejs_filter # ------------------------------- # Modèles de données # ------------------------------- class Thread(db.Model): id = db.Column(db.Integer, primary_key=True) title = db.Column(db.String(200), nullable=False) timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) messages = db.relationship('Message', backref='thread', lazy=True, cascade="all, delete-orphan") class Message(db.Model): id = db.Column(db.Integer, primary_key=True) thread_id = db.Column(db.Integer, db.ForeignKey('thread.id'), nullable=False) content = db.Column(db.Text, nullable=False) timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) vote_count = db.Column(db.Integer, default=0) reports = db.Column(db.Integer, default=0) removed = db.Column(db.Boolean, default=False) # ------------------------------- # Routes de base et fonctionnalités # ------------------------------- # Accueil : liste des fils de discussion @app.route('/') def index(): threads = Thread.query.order_by(Thread.timestamp.desc()).all() return render_template('index.html', threads=threads) # Création d'un nouveau fil de discussion @app.route('/new_thread', methods=['GET', 'POST']) def new_thread(): if request.method == 'POST': title = request.form.get('title', '').strip() content = request.form.get('content', '').strip() if not title or not content: flash("Le titre et le contenu initial sont obligatoires.", "error") return redirect(url_for('new_thread')) # Création du thread thread = Thread(title=title) db.session.add(thread) db.session.commit() # Message initial message = Message(thread_id=thread.id, content=content) db.session.add(message) db.session.commit() flash("Fil de discussion créé.", "success") return redirect(url_for('thread', thread_id=thread.id)) return render_template('new_thread.html') # Visualisation d'un fil et réponse @app.route('/thread/', methods=['GET', 'POST']) def thread(thread_id): thread_obj = Thread.query.get_or_404(thread_id) if request.method == 'POST': content = request.form.get('content', '').strip() if not content: flash("Le contenu du message ne peut pas être vide.", "error") return redirect(url_for('thread', thread_id=thread_id)) message = Message(thread_id=thread_id, content=content) db.session.add(message) db.session.commit() flash("Réponse postée.", "success") return redirect(url_for('thread', thread_id=thread_id)) # Afficher uniquement les messages de moins de 72h et non supprimés expiration_threshold = datetime.now(timezone.utc) - timedelta(hours=72) messages = Message.query.filter( Message.thread_id == thread_id, Message.timestamp >= expiration_threshold, Message.removed == False ).order_by(Message.timestamp.asc()).all() return render_template('thread.html', thread=thread_obj, messages=messages) # Système de vote (upvote/downvote) @app.route('/vote//', methods=['POST']) def vote(message_id, action): message = Message.query.get_or_404(message_id) if action == 'up': message.vote_count += 1 elif action == 'down': message.vote_count -= 1 else: abort(400) db.session.commit() flash("Vote enregistré.", "success") return redirect(request.referrer or url_for('index')) # Signalement d'un message @app.route('/report/', methods=['POST']) def report(message_id): message = Message.query.get_or_404(message_id) message.reports += 1 db.session.commit() flash("Message signalé.", "success") return redirect(request.referrer or url_for('index')) # Prévisualisation d'un message en Markdown @app.route('/preview', methods=['POST']) def preview(): content = request.form.get('content', '') rendered = markdown.markdown(content) return jsonify({'preview': rendered}) # Recherche dans les threads et messages @app.route('/search') def search(): query = request.args.get('q', '').strip() if not query: flash("Veuillez entrer un terme de recherche.", "error") return redirect(url_for('index')) threads = Thread.query.filter(Thread.title.ilike(f'%{query}%')).all() messages = Message.query.filter(Message.content.ilike(f'%{query}%')).all() return render_template('search.html', query=query, threads=threads, messages=messages) # Page de modération : affichage des messages signalés @app.route('/moderate') def moderate(): reported_messages = Message.query.filter( Message.reports >= 3, Message.removed == False ).order_by(Message.reports.desc()).all() return render_template('moderate.html', messages=reported_messages) # Action de modération : retirer un message @app.route('/remove/', methods=['POST']) def remove_message(message_id): message = Message.query.get_or_404(message_id) message.removed = True db.session.commit() flash("Message retiré.", "success") return redirect(url_for('moderate')) # ------------------------------- # Suppression automatique des messages de plus de 72 heures # ------------------------------- def delete_old_messages(): with app.app_context(): expiration_threshold = datetime.now(timezone.utc) - timedelta(hours=72) old_messages = Message.query.filter(Message.timestamp < expiration_threshold).all() count = len(old_messages) for msg in old_messages: db.session.delete(msg) db.session.commit() if count: print(f"{count} messages supprimés définitivement.") scheduler = BackgroundScheduler(daemon=True) scheduler.add_job(func=delete_old_messages, trigger="interval", hours=1) scheduler.start() # ------------------------------- # Lancement de l'application # ------------------------------- if __name__ == '__main__': with app.app_context(): db.create_all() app.run(debug=True)