Last active
November 9, 2020 11:42
-
-
Save judoole/a5c7d43db777ff8b1d2be80b55fcc757 to your computer and use it in GitHub Desktop.
Blogpost on OpsGenie alerting in Airflow https://medium.com/p/239ddea61d0a
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Slack friendly message | |
| _DEFAULT_DESCRIPTION = """ | |
| *DAG:* | |
| <{{dag_url}}|{{ti.dag_id}}> | |
| *Task:* | |
| {{ti.task_id}} | |
| *Execution date:* | |
| {{ds}} | |
| *Log:* | |
| {{ti.log_url}} | |
| *Exception:* | |
| ```{{exception}}``` | |
| """ | |
| class OpsGenieExceptionReporter(): | |
| def __init__(self, | |
| message: str = 'Oh no {{ti.task_id}} in {{ti.dag_id}} failed', | |
| priority: str = 'P3', | |
| tags: List[str] = ["Airflow"], | |
| responders: Dict = None, | |
| team_id: str = None, | |
| team_name: str = "My-Team", | |
| description: str = _DEFAULT_DESCRIPTION, | |
| connection_id: str = "opsgenie_default" | |
| *args, **kwargs) -> None: | |
| super(OpsGenieExceptionReporter, self).__init__(*args, **kwargs) | |
| self.message = message | |
| self.priority = priority | |
| self.tags = tags or [] | |
| self.responders = responders | |
| self.team_id = team_id | |
| self.description = description | |
| self.team_name = team_name | |
| self.connection_id = connection_id | |
| if team_name is None and responders is None and team_id is None: | |
| raise ValueError("team_name, team_id or responders must be set") | |
| # Create responders if team_id/name is set | |
| if team_name: | |
| self.responders = [{"name": team_name, "type": "team"}] | |
| elif team_id: | |
| self.responders = [{"id": team_id, "type": "team"}] | |
| def __call__(self, context) -> requests.Response: | |
| hook = OpsgenieAlertHook(self.connection_id) | |
| return hook.execute(self.create_request_body(context)) | |
| def create_request_body(self, context) -> Dict: | |
| # We use a task to render strings | |
| task: BaseOperator = context["task"] | |
| task_instance: TaskInstance = context["ti"] | |
| # Update context with some exception handling | |
| context.update(self._create_extra_context(context)) | |
| json = { | |
| "message": task.render_template(self.message, context), | |
| "description": task.render_template(self.description, context), | |
| "responders": self.responders, | |
| "tags": self.tags, | |
| "details": { | |
| "logUrl": task_instance.log_url, | |
| "jobId": task_instance.job_id or "", | |
| "dagId": task_instance.dag_id, | |
| "taskId": task_instance.task_id | |
| }, | |
| "priority": self.priority, | |
| "alias": task.render_template(self.alias, context) | |
| } | |
| return json |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment