134 lines
4.4 KiB
Python
134 lines
4.4 KiB
Python
#!/usr/bin/python3
|
|
|
|
import os
|
|
import ssl
|
|
import json
|
|
import subprocess
|
|
import shlex
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse, parse_qs
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
|
|
# Load configuration from /etc/webhookd.conf
|
|
CONFIG_FILE = '/etc/webhookd.conf'
|
|
DEFAULT_CONFIG = {
|
|
'SCRIPT_FOLDER': '/webhooks/',
|
|
'HOST': '0.0.0.0',
|
|
'PORT': 8443,
|
|
'CERT_FILE': 'server.crt',
|
|
'KEY_FILE': 'server.key'
|
|
}
|
|
|
|
def load_config():
|
|
config = DEFAULT_CONFIG.copy()
|
|
if os.path.isfile(CONFIG_FILE):
|
|
with open(CONFIG_FILE, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
key, _, value = line.partition('=')
|
|
key = key.strip()
|
|
value = value.strip()
|
|
if key and value:
|
|
config[key] = value
|
|
return config
|
|
|
|
CONFIG = load_config()
|
|
|
|
class WebhookHandler(BaseHTTPRequestHandler):
|
|
def do_POST(self):
|
|
# Parse URL for parameters
|
|
url_parts = urlparse(self.path)
|
|
script_name = os.path.basename(url_parts.path)
|
|
query_params = parse_qs(url_parts.query)
|
|
|
|
# Extract additional parameters from JSON body if provided
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
if content_length > 0:
|
|
post_data = self.rfile.read(content_length).decode('utf-8')
|
|
try:
|
|
request_data = json.loads(post_data)
|
|
query_params.update(request_data)
|
|
except json.JSONDecodeError:
|
|
self.respond(400, {
|
|
'script': script_name,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'duration': 0,
|
|
'exitcode': None,
|
|
'stderr': 'Invalid JSON in request body',
|
|
'stdout': ''
|
|
})
|
|
return
|
|
|
|
# Build the script path and arguments
|
|
script_path = os.path.join(CONFIG['SCRIPT_FOLDER'], script_name)
|
|
|
|
# Check if script exists
|
|
if not os.path.isfile(script_path):
|
|
self.respond(404, {
|
|
'script': script_name,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'duration': 0,
|
|
'exitcode': None,
|
|
'stderr': 'The script cannot be found',
|
|
'stdout': ''
|
|
})
|
|
return
|
|
|
|
# Prepare parameters
|
|
args = [script_path]
|
|
for key, values in query_params.items():
|
|
for value in values:
|
|
args.append(f"{shlex.quote(key)}={shlex.quote(value)}")
|
|
|
|
# Execute the script
|
|
start_time = datetime.utcnow()
|
|
try:
|
|
result = subprocess.run(
|
|
args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
self.respond(200, {
|
|
'script': script_name,
|
|
'timestamp': start_time.isoformat(),
|
|
'duration': duration,
|
|
'exitcode': result.returncode,
|
|
'stderr': result.stderr,
|
|
'stdout': result.stdout
|
|
})
|
|
except Exception as e:
|
|
duration = (datetime.utcnow() - start_time).total_seconds()
|
|
self.respond(500, {
|
|
'script': script_name,
|
|
'timestamp': start_time.isoformat(),
|
|
'duration': duration,
|
|
'exitcode': None,
|
|
'stderr': str(e),
|
|
'stdout': ''
|
|
})
|
|
|
|
def respond(self, status_code, response_data):
|
|
self.send_response(status_code)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(response_data).encode('utf-8'))
|
|
|
|
# Main function to start the server
|
|
def run_server():
|
|
httpd = HTTPServer((CONFIG['HOST'], int(CONFIG['PORT'])), WebhookHandler)
|
|
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
|
context.load_cert_chain(certfile=CONFIG['CERT_FILE'], keyfile=CONFIG['KEY_FILE'])
|
|
|
|
httpd.socket = context.wrap_socket(httpd.socket, server_side=True)
|
|
|
|
print(f"Starting TLS-enabled HTTP server on {CONFIG['HOST']}:{CONFIG['PORT']}")
|
|
httpd.serve_forever()
|
|
|
|
if __name__ == '__main__':
|
|
run_server()
|
|
|