#!/usr/bin/python3 import os import ssl import json import subprocess import shlex from datetime import datetime from urllib.parse import urlparse, parse_qs from http.server import HTTPServer, BaseHTTPRequestHandler # Load configuration from /etc/webhookd.conf CONFIG_FILE = '/etc/webhookd.conf' DEFAULT_CONFIG = { 'SCRIPT_FOLDER': '/webhooks/', 'HOST': '0.0.0.0', 'PORT': 8443, 'CERT_FILE': 'server.crt', 'KEY_FILE': 'server.key' } def load_config(): config = DEFAULT_CONFIG.copy() if os.path.isfile(CONFIG_FILE): with open(CONFIG_FILE, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue key, _, value = line.partition('=') key = key.strip() value = value.strip() if key and value: config[key] = value return config CONFIG = load_config() class WebhookHandler(BaseHTTPRequestHandler): def do_POST(self): # Parse URL for parameters url_parts = urlparse(self.path) script_name = os.path.basename(url_parts.path) query_params = parse_qs(url_parts.query) # Extract additional parameters from JSON body if provided content_length = int(self.headers.get('Content-Length', 0)) if content_length > 0: post_data = self.rfile.read(content_length).decode('utf-8') try: request_data = json.loads(post_data) query_params.update(request_data) except json.JSONDecodeError: self.respond(400, { 'script': script_name, 'timestamp': datetime.utcnow().isoformat(), 'duration': 0, 'exitcode': None, 'stderr': 'Invalid JSON in request body', 'stdout': '' }) return # Build the script path and arguments script_path = os.path.join(CONFIG['SCRIPT_FOLDER'], script_name) # Check if script exists if not os.path.isfile(script_path): self.respond(404, { 'script': script_name, 'timestamp': datetime.utcnow().isoformat(), 'duration': 0, 'exitcode': None, 'stderr': 'The script cannot be found', 'stdout': '' }) return # Prepare parameters args = [script_path] for key, values in query_params.items(): for value in values: args.append(f"{shlex.quote(key)}={shlex.quote(value)}") # Execute the script start_time = datetime.utcnow() try: result = subprocess.run( args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) duration = (datetime.utcnow() - start_time).total_seconds() self.respond(200, { 'script': script_name, 'timestamp': start_time.isoformat(), 'duration': duration, 'exitcode': result.returncode, 'stderr': result.stderr, 'stdout': result.stdout }) except Exception as e: duration = (datetime.utcnow() - start_time).total_seconds() self.respond(500, { 'script': script_name, 'timestamp': start_time.isoformat(), 'duration': duration, 'exitcode': None, 'stderr': str(e), 'stdout': '' }) def respond(self, status_code, response_data): self.send_response(status_code) self.send_header('Content-Type', 'application/json') self.end_headers() self.wfile.write(json.dumps(response_data).encode('utf-8')) # Main function to start the server def run_server(): httpd = HTTPServer((CONFIG['HOST'], int(CONFIG['PORT'])), WebhookHandler) context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(certfile=CONFIG['CERT_FILE'], keyfile=CONFIG['KEY_FILE']) httpd.socket = context.wrap_socket(httpd.socket, server_side=True) print(f"Starting TLS-enabled HTTP server on {CONFIG['HOST']}:{CONFIG['PORT']}") httpd.serve_forever() if __name__ == '__main__': run_server()