|
from flask import Flask, request, render_template, redirect, url_for, flash, jsonify, abort |
|
from flask_sqlalchemy import SQLAlchemy |
|
from datetime import datetime, timedelta, timezone |
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
import markdown |
|
|
|
app = Flask(__name__) |
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///forum.db' |
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
|
app.config['SECRET_KEY'] = 'change_this_secret_key' |
|
db = SQLAlchemy(app) |
|
|
|
|
|
|
|
|
|
def escapejs_filter(s): |
|
if s is None: |
|
return "" |
|
return (s.replace('\\', '\\\\') |
|
.replace("'", "\\'") |
|
.replace('"', '\\"') |
|
.replace('\n', '\\n') |
|
.replace('\r', '\\r')) |
|
app.jinja_env.filters['escapejs'] = escapejs_filter |
|
|
|
|
|
|
|
|
|
class Thread(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
title = db.Column(db.String(200), nullable=False) |
|
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) |
|
messages = db.relationship('Message', backref='thread', lazy=True, cascade="all, delete-orphan") |
|
|
|
class Message(db.Model): |
|
id = db.Column(db.Integer, primary_key=True) |
|
thread_id = db.Column(db.Integer, db.ForeignKey('thread.id'), nullable=False) |
|
content = db.Column(db.Text, nullable=False) |
|
timestamp = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc)) |
|
vote_count = db.Column(db.Integer, default=0) |
|
reports = db.Column(db.Integer, default=0) |
|
removed = db.Column(db.Boolean, default=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/') |
|
def index(): |
|
threads = Thread.query.order_by(Thread.timestamp.desc()).all() |
|
return render_template('index.html', threads=threads) |
|
|
|
|
|
@app.route('/new_thread', methods=['GET', 'POST']) |
|
def new_thread(): |
|
if request.method == 'POST': |
|
title = request.form.get('title', '').strip() |
|
content = request.form.get('content', '').strip() |
|
if not title or not content: |
|
flash("Le titre et le contenu initial sont obligatoires.", "error") |
|
return redirect(url_for('new_thread')) |
|
|
|
thread = Thread(title=title) |
|
db.session.add(thread) |
|
db.session.commit() |
|
|
|
message = Message(thread_id=thread.id, content=content) |
|
db.session.add(message) |
|
db.session.commit() |
|
flash("Fil de discussion créé.", "success") |
|
return redirect(url_for('thread', thread_id=thread.id)) |
|
return render_template('new_thread.html') |
|
|
|
|
|
@app.route('/thread/<int:thread_id>', methods=['GET', 'POST']) |
|
def thread(thread_id): |
|
thread_obj = Thread.query.get_or_404(thread_id) |
|
if request.method == 'POST': |
|
content = request.form.get('content', '').strip() |
|
if not content: |
|
flash("Le contenu du message ne peut pas être vide.", "error") |
|
return redirect(url_for('thread', thread_id=thread_id)) |
|
message = Message(thread_id=thread_id, content=content) |
|
db.session.add(message) |
|
db.session.commit() |
|
flash("Réponse postée.", "success") |
|
return redirect(url_for('thread', thread_id=thread_id)) |
|
|
|
expiration_threshold = datetime.now(timezone.utc) - timedelta(hours=72) |
|
messages = Message.query.filter( |
|
Message.thread_id == thread_id, |
|
Message.timestamp >= expiration_threshold, |
|
Message.removed == False |
|
).order_by(Message.timestamp.asc()).all() |
|
return render_template('thread.html', thread=thread_obj, messages=messages) |
|
|
|
|
|
@app.route('/vote/<int:message_id>/<action>', methods=['POST']) |
|
def vote(message_id, action): |
|
message = Message.query.get_or_404(message_id) |
|
if action == 'up': |
|
message.vote_count += 1 |
|
elif action == 'down': |
|
message.vote_count -= 1 |
|
else: |
|
abort(400) |
|
db.session.commit() |
|
flash("Vote enregistré.", "success") |
|
return redirect(request.referrer or url_for('index')) |
|
|
|
|
|
@app.route('/report/<int:message_id>', methods=['POST']) |
|
def report(message_id): |
|
message = Message.query.get_or_404(message_id) |
|
message.reports += 1 |
|
db.session.commit() |
|
flash("Message signalé.", "success") |
|
return redirect(request.referrer or url_for('index')) |
|
|
|
|
|
@app.route('/preview', methods=['POST']) |
|
def preview(): |
|
content = request.form.get('content', '') |
|
rendered = markdown.markdown(content) |
|
return jsonify({'preview': rendered}) |
|
|
|
|
|
@app.route('/search') |
|
def search(): |
|
query = request.args.get('q', '').strip() |
|
if not query: |
|
flash("Veuillez entrer un terme de recherche.", "error") |
|
return redirect(url_for('index')) |
|
threads = Thread.query.filter(Thread.title.ilike(f'%{query}%')).all() |
|
messages = Message.query.filter(Message.content.ilike(f'%{query}%')).all() |
|
return render_template('search.html', query=query, threads=threads, messages=messages) |
|
|
|
|
|
@app.route('/moderate') |
|
def moderate(): |
|
reported_messages = Message.query.filter( |
|
Message.reports >= 3, Message.removed == False |
|
).order_by(Message.reports.desc()).all() |
|
return render_template('moderate.html', messages=reported_messages) |
|
|
|
|
|
@app.route('/remove/<int:message_id>', methods=['POST']) |
|
def remove_message(message_id): |
|
message = Message.query.get_or_404(message_id) |
|
message.removed = True |
|
db.session.commit() |
|
flash("Message retiré.", "success") |
|
return redirect(url_for('moderate')) |
|
|
|
|
|
|
|
|
|
def delete_old_messages(): |
|
with app.app_context(): |
|
expiration_threshold = datetime.now(timezone.utc) - timedelta(hours=72) |
|
old_messages = Message.query.filter(Message.timestamp < expiration_threshold).all() |
|
count = len(old_messages) |
|
for msg in old_messages: |
|
db.session.delete(msg) |
|
db.session.commit() |
|
if count: |
|
print(f"{count} messages supprimés définitivement.") |
|
|
|
scheduler = BackgroundScheduler(daemon=True) |
|
scheduler.add_job(func=delete_old_messages, trigger="interval", hours=1) |
|
scheduler.start() |
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
with app.app_context(): |
|
db.create_all() |
|
app.run(debug=True) |
|
|