Even Better Webhook Listener

I made some improvements to the NetBox webhook listener presented in my earlier post. Here is the new one:

import hmac
import json
import logging
from flask import Flask, request
from flask_restplus import Api, Resource, fields

app = Flask(__name__)
api = Api(app, version="1.1", title="NetBox Webhook Listener",
        description="Tested with NetBox 2.6.6")
ns = api.namespace("webhook")

APP_NAME = "webhook-listener"
WEBHOOK_SECRET = "My precious"

logger = logging.getLogger(APP_NAME)
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s: %(message)s")
file_logging = logging.FileHandler("{}.log".format(APP_NAME))
file_logging.setFormatter(formatter)
logger.addHandler(file_logging)

webhook_request = api.model("Webhook request from NetBox", {
    'username': fields.String,
    'data': fields.Raw(description="The object data from NetBox"),
    'event': fields.String,
    'timestamp': fields.String,
    'model': fields.String,
    'request_id': fields.String,
})

def do_something_with_the_event(data):
    logger.info(data)
    pass

@ns.route("/")
class WebhookListener(Resource):
    @ns.expect(webhook_request)
    @ns.doc(responses={200: "Ok", 400: "Bad request"})
    def post(self):
        if request.content_length > 1_000_000:
            # To prevent memory allocation attacks
            logger.error("Content too long ({})".format(
                request.content_length
            ))
            return {"result": "Content too long"}, 400
        input_data = request.get_data()
        x_hook_signature = request.headers.get("X-Hook-Signature")
        if x_hook_signature:
            input_hmac = hmac.new(
                key=WEBHOOK_SECRET.encode(),
                msg=input_data,
                digestmod="sha512"
            )
            if not hmac.compare_digest(
                    input_hmac.hexdigest(),
                    x_hook_signature):
                logger.error("Invalid message signature")
                return {"result": "Invalid message signature"}, 400
            logger.info("Message signature checked ok")
        else:
            logger.info("No message signature to check")
        try:
            input_dict = json.loads(input_data)
        except json.JSONDecodeError:
            input_dict = None
        if not input_dict or "model" not in input_dict:
            logger.error("Invalid input: {}".format(input_data))
            return {"result":"Invalid input"}, 400
        do_something_with_the_event(input_dict)
        return {"result":"ok"}, 200

if __name__ == "__main__":
    app.run(host="0.0.0.0")

Example from the log file:

2019-10-13 15:06:22,932 webhook-listener INFO: Message signature checked ok
2019-10-13 15:06:22,932 webhook-listener INFO: {'request_id': 'f731df57-08a4-4eff-b07e-3633770e449c', 'event': 'updated', 'data': {'slug': 'my-testing-tenant', 'group': None, 'tags': ['Finland', 'Torilla tavataan', 'Suomi mainittu'], 'name': 'My Testing Tenant', 'id': 7, 'description': '', 'created': '2019-10-12', 'custom_fields': {}, 'last_updated': '2019-10-13T12:06:22.839338Z', 'comments': ''}, 'username': 'admin', 'model': 'tenant', 'timestamp': '2019-10-13 12:06:22.903111'}

Notable changes:

  • Added the message digest verification (X-Hook-Signature header sent by NetBox)
  • Added content length checking as an example of all the worries you need to take care of when publishing an interface

Links:

Leave a Reply