Webhook Signature Verification
Verify webhook signatures to ensure authenticity
Why Verify Signatures?
Webhook signature verification ensures that:
- Requests actually come from SnipeRoute (not a malicious actor)
- The payload hasn't been tampered with in transit
- Your endpoint is protected from replay attacks
Always verify webhook signatures in production. Skipping verification exposes your application to security risks.
How Signatures Work
SnipeRoute signs each webhook using HMAC-SHA256 (Stripe-style):
- SnipeRoute combines timestamp and payload:
{timestamp}.{payload} - Signs with your hex-encoded webhook secret
- Sends signature in
sr-signatureheader withv1=prefix - Your server recomputes the signature using the same method
- If signatures match AND timestamp is recent, the request is authentic
Webhook Headers
SnipeRoute sends three headers with each webhook:
sr-signature: v1=8f7b3d9e2a1c4f6b5d8e9a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d7e8f9a0b1c2
sr-timestamp: 1732980600
sr-event-id: evt_abc123def456| Header | Description |
|---|---|
sr-signature | HMAC-SHA256 signature with v1= prefix |
sr-timestamp | Unix timestamp when the event was signed |
sr-event-id | Unique event ID for idempotency |
The signature is computed from {timestamp}.{payload} using your hex-encoded webhook secret.
Verification Steps
Extract Headers
Get sr-signature, sr-timestamp, and sr-event-id headers.
Check Timestamp
Verify timestamp is within 5 minutes of current time (replay protection).
Get Raw Body
Read the raw request body as bytes (don't parse JSON yet).
Reconstruct Message
Create the signed message: {timestamp}.{payload}
Compute Expected Signature
Use HMAC-SHA256 with your hex-decoded webhook secret.
Compare
Use a constant-time comparison (like hmac.compare_digest) to compare signatures.
Accept or Reject
If signatures match AND timestamp is valid, process the webhook. Otherwise, return 401 Unauthorized.
Python SDK (Recommended)
The SnipeRoute Python SDK provides built-in signature verification:
Using verify_webhook_signature()
from fastapi import FastAPI, Request, HTTPException
from sniperoute import verify_webhook_signature, WebhookVerificationError
app = FastAPI()
WEBHOOK_SECRET = "your_hex_encoded_secret"
@app.post("/webhooks/sniperoute")
async def handle_webhook(request: Request):
body = await request.body()
signature = request.headers.get("sr-signature")
timestamp = request.headers.get("sr-timestamp")
event_id = request.headers.get("sr-event-id")
try:
verify_webhook_signature(
payload=body,
signature_header=signature,
timestamp=timestamp,
secret=WEBHOOK_SECRET,
)
except WebhookVerificationError as e:
raise HTTPException(status_code=401, detail=e.message)
event = await request.json()
print(f"Received event: {event['type']} (id: {event_id})")
return {"status": "received"}Using WebhookSigner Class
For more control, use the WebhookSigner class:
from sniperoute import WebhookSigner, WebhookVerificationError
signer = WebhookSigner(
secret="your_hex_encoded_secret",
tolerance=300 # 5 minutes (default)
)
# Verify a webhook
try:
signer.verify(
payload=body,
signature_header=signature,
timestamp=timestamp,
)
except WebhookVerificationError as e:
print(f"Verification failed: {e.message} (reason: {e.reason})")Error Handling
WebhookVerificationError includes a reason field for programmatic handling:
| Reason | Description |
|---|---|
missing_signature | sr-signature header not provided |
missing_timestamp | sr-timestamp header not provided |
invalid_timestamp | Timestamp is not a valid integer |
timestamp_expired | Timestamp is outside tolerance window |
invalid_signature_format | Signature doesn't start with v1= |
signature_mismatch | Computed signature doesn't match |
try:
verify_webhook_signature(payload, signature, timestamp, secret)
except WebhookVerificationError as e:
if e.reason == "timestamp_expired":
# Possible replay attack or clock skew
log.warning(f"Expired webhook: {e.message}")
elif e.reason == "signature_mismatch":
# Wrong secret or tampered payload
log.error(f"Invalid signature: {e.message}")
raise HTTPException(status_code=401, detail=e.message)Manual Python Implementation
If you're not using the SDK, here's manual implementation:
FastAPI
from fastapi import FastAPI, Request, HTTPException
import hmac
import hashlib
import time
app = FastAPI()
# Hex-encoded secret from your SnipeRoute dashboard
WEBHOOK_SECRET = "your_webhook_secret_from_dashboard"
TIMESTAMP_TOLERANCE = 300 # 5 minutes
@app.post("/webhooks/sniperoute")
async def handle_webhook(request: Request):
# 1. Extract headers
signature_header = request.headers.get("sr-signature")
timestamp = request.headers.get("sr-timestamp")
event_id = request.headers.get("sr-event-id")
if not signature_header or not timestamp:
raise HTTPException(status_code=401, detail="Missing required headers")
# 2. Check timestamp (replay protection)
try:
ts = int(timestamp)
if abs(time.time() - ts) > TIMESTAMP_TOLERANCE:
raise HTTPException(status_code=401, detail="Timestamp too old")
except ValueError:
raise HTTPException(status_code=401, detail="Invalid timestamp")
# 3. Parse signature header (format: "v1=...")
if not signature_header.startswith("v1="):
raise HTTPException(status_code=401, detail="Invalid signature format")
signature = signature_header[3:]
# 4. Get raw body (as bytes)
body = await request.body()
# 5. Reconstruct signed message
message = f"{timestamp}.{body.decode()}"
# 6. Compute expected signature (secret is hex-encoded)
expected_signature = hmac.new(
bytes.fromhex(WEBHOOK_SECRET),
message.encode("utf-8"),
hashlib.sha256
).hexdigest()
# 7. Compare signatures (constant-time)
if not hmac.compare_digest(signature, expected_signature):
raise HTTPException(status_code=401, detail="Invalid signature")
# 8. Signature valid - parse and process event
event = await request.json()
# Handle event (use event_id for idempotency)
print(f"Received event: {event['type']} (id: {event_id})")
return {"status": "received"}Flask
from flask import Flask, request, jsonify
import hmac
import hashlib
import time
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret_from_dashboard"
TIMESTAMP_TOLERANCE = 300 # 5 minutes
@app.route("/webhooks/sniperoute", methods=["POST"])
def handle_webhook():
# Extract headers
signature_header = request.headers.get("sr-signature")
timestamp = request.headers.get("sr-timestamp")
if not signature_header or not timestamp:
return jsonify({"error": "Missing required headers"}), 401
# Check timestamp
try:
ts = int(timestamp)
if abs(time.time() - ts) > TIMESTAMP_TOLERANCE:
return jsonify({"error": "Timestamp too old"}), 401
except ValueError:
return jsonify({"error": "Invalid timestamp"}), 401
# Parse signature header
if not signature_header.startswith("v1="):
return jsonify({"error": "Invalid signature format"}), 401
signature = signature_header[3:]
# Get raw body
body = request.get_data()
# Reconstruct signed message
message = f"{timestamp}.{body.decode()}"
# Compute expected signature
expected_signature = hmac.new(
bytes.fromhex(WEBHOOK_SECRET),
message.encode("utf-8"),
hashlib.sha256
).hexdigest()
# Compare
if not hmac.compare_digest(signature, expected_signature):
return jsonify({"error": "Invalid signature"}), 401
# Process event
event = request.get_json()
print(f"Received event: {event['type']}")
return jsonify({"status": "received"})Django
from django.http import JsonResponse, HttpResponseForbidden
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
import hmac
import hashlib
import json
import time
WEBHOOK_SECRET = "your_webhook_secret_from_dashboard"
TIMESTAMP_TOLERANCE = 300 # 5 minutes
@csrf_exempt
@require_http_methods(["POST"])
def handle_webhook(request):
# Extract headers
signature_header = request.headers.get("sr-signature")
timestamp = request.headers.get("sr-timestamp")
if not signature_header or not timestamp:
return HttpResponseForbidden("Missing required headers")
# Check timestamp
try:
ts = int(timestamp)
if abs(time.time() - ts) > TIMESTAMP_TOLERANCE:
return HttpResponseForbidden("Timestamp too old")
except ValueError:
return HttpResponseForbidden("Invalid timestamp")
# Parse signature header
if not signature_header.startswith("v1="):
return HttpResponseForbidden("Invalid signature format")
signature = signature_header[3:]
# Get raw body
body = request.body
# Reconstruct signed message
message = f"{timestamp}.{body.decode()}"
# Compute expected signature
expected_signature = hmac.new(
bytes.fromhex(WEBHOOK_SECRET),
message.encode("utf-8"),
hashlib.sha256
).hexdigest()
# Compare
if not hmac.compare_digest(signature, expected_signature):
return HttpResponseForbidden("Invalid signature")
# Process event
event = json.loads(body)
print(f"Received event: {event['type']}")
return JsonResponse({"status": "received"})Node.js Implementation
Express
const express = require('express');
const crypto = require('crypto');
const app = express();
const WEBHOOK_SECRET = 'your_webhook_secret_from_dashboard';
const TIMESTAMP_TOLERANCE = 300; // 5 minutes
// Use express.raw() to get raw body as Buffer
app.post('/webhooks/sniperoute', express.raw({ type: 'application/json' }), (req, res) => {
// 1. Extract headers
const signatureHeader = req.headers['sr-signature'];
const timestamp = req.headers['sr-timestamp'];
const eventId = req.headers['sr-event-id'];
if (!signatureHeader || !timestamp) {
return res.status(401).send('Missing required headers');
}
// 2. Check timestamp (replay protection)
const ts = parseInt(timestamp, 10);
if (isNaN(ts) || Math.abs(Date.now() / 1000 - ts) > TIMESTAMP_TOLERANCE) {
return res.status(401).send('Invalid or expired timestamp');
}
// 3. Parse signature header (format: "v1=...")
if (!signatureHeader.startsWith('v1=')) {
return res.status(401).send('Invalid signature format');
}
const signature = signatureHeader.slice(3);
// 4. Get raw body (Buffer)
const body = req.body;
// 5. Reconstruct signed message
const message = `${timestamp}.${body.toString()}`;
// 6. Compute expected signature (secret is hex-encoded)
const expectedSignature = crypto
.createHmac('sha256', Buffer.from(WEBHOOK_SECRET, 'hex'))
.update(message)
.digest('hex');
// 7. Compare (constant-time)
if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expectedSignature))) {
return res.status(401).send('Invalid signature');
}
// 8. Parse and process event
const event = JSON.parse(body);
console.log(`Received event: ${event.type} (id: ${eventId})`);
res.status(200).send({ status: 'received' });
});
app.listen(3000);Next.js API Route
import type { NextApiRequest, NextApiResponse } from 'next';
import crypto from 'crypto';
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET!;
const TIMESTAMP_TOLERANCE = 300; // 5 minutes
export const config = {
api: {
bodyParser: false, // Disable body parser to get raw body
},
};
async function getRawBody(req: NextApiRequest): Promise<Buffer> {
const chunks = [];
for await (const chunk of req) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method not allowed' });
}
// Extract headers
const signatureHeader = req.headers['sr-signature'] as string;
const timestamp = req.headers['sr-timestamp'] as string;
const eventId = req.headers['sr-event-id'] as string;
if (!signatureHeader || !timestamp) {
return res.status(401).json({ error: 'Missing required headers' });
}
// Check timestamp
const ts = parseInt(timestamp, 10);
if (isNaN(ts) || Math.abs(Date.now() / 1000 - ts) > TIMESTAMP_TOLERANCE) {
return res.status(401).json({ error: 'Invalid or expired timestamp' });
}
// Parse signature header
if (!signatureHeader.startsWith('v1=')) {
return res.status(401).json({ error: 'Invalid signature format' });
}
const signature = signatureHeader.slice(3);
// Get raw body
const body = await getRawBody(req);
// Reconstruct signed message
const message = `${timestamp}.${body.toString()}`;
// Compute expected signature
const expectedSignature = crypto
.createHmac('sha256', Buffer.from(WEBHOOK_SECRET, 'hex'))
.update(message)
.digest('hex');
// Compare
if (!crypto.timingSafeEqual(Buffer.from(signature), Buffer.from(expectedSignature))) {
return res.status(401).json({ error: 'Invalid signature' });
}
// Parse and process event
const event = JSON.parse(body.toString());
console.log(`Received event: ${event.type} (id: ${eventId})`);
return res.status(200).json({ status: 'received' });
}Ruby Implementation
Sinatra
require 'sinatra'
require 'openssl'
require 'json'
WEBHOOK_SECRET = 'your_webhook_secret_from_dashboard'
post '/webhooks/sniperoute' do
# Extract signature
signature = request.env['HTTP_X_SNIPEROUTE_SIGNATURE']
halt 401, 'Missing signature' unless signature
# Get raw body
body = request.body.read
# Compute expected signature
expected_signature = OpenSSL::HMAC.hexdigest(
'sha256',
WEBHOOK_SECRET,
body
)
# Compare (constant-time)
unless Rack::Utils.secure_compare(signature, expected_signature)
halt 401, 'Invalid signature'
end
# Parse and process event
event = JSON.parse(body)
puts "Received event: #{event['type']}"
status 200
{ status: 'received' }.to_json
endGo Implementation
package main
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
)
const webhookSecret = "your_webhook_secret_from_dashboard"
func handleWebhook(w http.ResponseWriter, r *http.Request) {
// Extract signature
signature := r.Header.Get("X-SnipeRoute-Signature")
if signature == "" {
http.Error(w, "Missing signature", http.StatusUnauthorized)
return
}
// Get raw body
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading body", http.StatusBadRequest)
return
}
// Compute expected signature
mac := hmac.New(sha256.New, []byte(webhookSecret))
mac.Write(body)
expectedSignature := hex.EncodeToString(mac.Sum(nil))
// Compare (constant-time)
if !hmac.Equal([]byte(signature), []byte(expectedSignature)) {
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
// Parse and process event
var event map[string]interface{}
json.Unmarshal(body, &event)
println("Received event:", event["type"])
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"received"}`))
}
func main() {
http.HandleFunc("/webhooks/sniperoute", handleWebhook)
http.ListenAndServe(":8080", nil)
}Common Mistakes
Parsing Body Before Verification
Don't do this:
event = await request.json() # Parses body
signature = compute_signature(event) # Wrong!Do this:
body = await request.body() # Raw bytes
signature = compute_signature(body)
event = json.loads(body) # Parse after verificationUsing String Comparison
Don't do this:
if signature == expected_signature: # Vulnerable to timing attacksDo this:
if hmac.compare_digest(signature, expected_signature): # Constant-timeHardcoding Secret
Don't do this:
WEBHOOK_SECRET = "sk_webhook_abc123..." # In code!Do this:
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") # From environmentTesting Signature Verification
Generate Test Signature
import hmac
import hashlib
import time
# Hex-encoded secret (64 characters)
webhook_secret = "your_hex_encoded_secret_here"
timestamp = str(int(time.time()))
payload = '{"type":"intent.filled","id":"evt_123","data":{}}'
# Reconstruct signed message
message = f"{timestamp}.{payload}"
signature = hmac.new(
bytes.fromhex(webhook_secret),
message.encode(),
hashlib.sha256
).hexdigest()
print(f"sr-signature: v1={signature}")
print(f"sr-timestamp: {timestamp}")Send Test Webhook with cURL
# Hex-encoded secret
SECRET="your_hex_encoded_secret_here"
TIMESTAMP=$(date +%s)
PAYLOAD='{"type":"intent.filled","id":"evt_123","data":{}}'
MESSAGE="${TIMESTAMP}.${PAYLOAD}"
# Compute signature (note: xxd -r -p decodes hex)
SIGNATURE=$(echo -n "$MESSAGE" | openssl dgst -sha256 -mac HMAC -macopt hexkey:$SECRET | awk '{print $2}')
curl -X POST http://localhost:8000/webhooks/sniperoute \
-H "Content-Type: application/json" \
-H "sr-signature: v1=$SIGNATURE" \
-H "sr-timestamp: $TIMESTAMP" \
-H "sr-event-id: evt_test_123" \
-d "$PAYLOAD"