Skip to content

Instantly share code, notes, and snippets.

@judoole
Last active November 9, 2020 11:42
Show Gist options
  • Select an option

  • Save judoole/a5c7d43db777ff8b1d2be80b55fcc757 to your computer and use it in GitHub Desktop.

Select an option

Save judoole/a5c7d43db777ff8b1d2be80b55fcc757 to your computer and use it in GitHub Desktop.
Blogpost on OpsGenie alerting in Airflow https://medium.com/p/239ddea61d0a
# Slack friendly message
_DEFAULT_DESCRIPTION = """
*DAG:*
<{{dag_url}}|{{ti.dag_id}}>
*Task:*
{{ti.task_id}}
*Execution date:*
{{ds}}
*Log:*
{{ti.log_url}}
*Exception:*
```{{exception}}```
"""
class OpsGenieExceptionReporter():
def __init__(self,
message: str = 'Oh no {{ti.task_id}} in {{ti.dag_id}} failed',
priority: str = 'P3',
tags: List[str] = ["Airflow"],
responders: Dict = None,
team_id: str = None,
team_name: str = "My-Team",
description: str = _DEFAULT_DESCRIPTION,
connection_id: str = "opsgenie_default"
*args, **kwargs) -> None:
super(OpsGenieExceptionReporter, self).__init__(*args, **kwargs)
self.message = message
self.priority = priority
self.tags = tags or []
self.responders = responders
self.team_id = team_id
self.description = description
self.team_name = team_name
self.connection_id = connection_id
if team_name is None and responders is None and team_id is None:
raise ValueError("team_name, team_id or responders must be set")
# Create responders if team_id/name is set
if team_name:
self.responders = [{"name": team_name, "type": "team"}]
elif team_id:
self.responders = [{"id": team_id, "type": "team"}]
def __call__(self, context) -> requests.Response:
hook = OpsgenieAlertHook(self.connection_id)
return hook.execute(self.create_request_body(context))
def create_request_body(self, context) -> Dict:
# We use a task to render strings
task: BaseOperator = context["task"]
task_instance: TaskInstance = context["ti"]
# Update context with some exception handling
context.update(self._create_extra_context(context))
json = {
"message": task.render_template(self.message, context),
"description": task.render_template(self.description, context),
"responders": self.responders,
"tags": self.tags,
"details": {
"logUrl": task_instance.log_url,
"jobId": task_instance.job_id or "",
"dagId": task_instance.dag_id,
"taskId": task_instance.task_id
},
"priority": self.priority,
"alias": task.render_template(self.alias, context)
}
return json
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment