Skip to content

Instantly share code, notes, and snippets.

@tmokmss
Created August 22, 2025 05:44
Show Gist options
  • Select an option

  • Save tmokmss/7a9f144c16575c63270cc9f82bbfa7f0 to your computer and use it in GitHub Desktop.

Select an option

Save tmokmss/7a9f144c16575c63270cc9f82bbfa7f0 to your computer and use it in GitHub Desktop.
Python implementation to publish a event to AppSync Events using HTTP protocol
import os
import boto3
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import requests
import json
def send_event(channel_path: str, payload: any) -> None:
"""
A funtion to send an event to AWS AppSync Events
"""
endpoint = os.environ.get('EVENT_HTTP_ENDPOINT', "https://xxxxx.appsync-api.us-east-1.amazonaws.com/event")
region = os.environ.get('AWS_REGION', 'us-east-1')
method = "POST"
request_body = {
"channel": f"default/{channel_path}",
"events": [json.dumps(payload)]
}
session = boto3.Session(
region_name=region
)
request = AWSRequest(
method,
endpoint,
headers={'content-type': 'application/json'},
data=json.dumps(request_body)
)
SigV4Auth(session.get_credentials(), 'appsync', region).add_auth(request)
try:
response = requests.request(method, endpoint, headers=dict(request.headers), data=request.body, timeout=5)
response.raise_for_status()
print(f'Response Status: {response.status_code}')
print(f'Response Body: {response.content.decode("utf-8")}')
except Exception as e:
print(e)
send_event("test", {"foo": "bar"})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment