Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Customizing Apache Superset to Send Alerts via Webhook (REST API) Instead of Email/Slack #30304

Open
Bindu-yadav8 opened this issue Sep 17, 2024 · 6 comments
Assignees
Labels
alert-reports Namespace | Anything related to the Alert & Reports feature change:backend Requires changing the backend

Comments

@Bindu-yadav8
Copy link

Hello Superset Community,

I’m working on a project where we need to send alerts to a webhook (REST API) with alert details, rather than using the default email or Slack notification channels provided by Superset. I’m looking to customize Superset to achieve this functionality.

Requirements
Custom Notification Channel: Implement a webhook (REST API) to receive alerts.
Alert Details: The webhook should receive detailed information about the alert.
Customization Steps
I understand that to implement this, modifications will likely be needed in the following areas:

Notification Handler:

Implement a custom notification handler to send alerts to the webhook.
The class might need to extend BaseNotifier from superset.tasks.notifications.base.
Configuration Files:

Update superset_config.py to register the new notification handler.
Ensure the new webhook handler is included in the notification listeners.
Alerting Component:

Modify or extend alert creation and management to support webhook notifications.

Questions

Files to Modify: What specific files and classes should be modified to integrate a custom webhook for alerts?
Implementation Guidance: Could you provide guidance on implementing a custom notifier for webhooks? Any code snippets or references would be greatly appreciated.
Testing: What’s the best way to test this integration to ensure that alerts are properly sent to the webhook?
Any assistance or pointers to relevant documentation would be greatly appreciated!

@dosubot dosubot bot added alert-reports Namespace | Anything related to the Alert & Reports feature change:backend Requires changing the backend labels Sep 17, 2024
@fisjac
Copy link
Contributor

fisjac commented Sep 19, 2024

Can you add more details regarding what alert data/details you're looking to send to the webhook?

I would look into the following files to get started:
AlertReportModal.tsx - frontend modal for alerts/reports
superset/tasks/scheduler.py - celery worker tasks for reports
superset/commands/execute - executor functions for the celery worker to perform

@Bindu-yadav8
Copy link
Author

Bindu-yadav8 commented Sep 23, 2024

Hi everyone,

I'm currently working on integrating custom webhook notifications for alerts in Apache Superset. My goal is to send alert messages to a POST REST API instead of using traditional notification methods like email or Slack.

What I’ve Implemented:

Custom Webhook Notification Method: I've created a custom script (webhook.py) to handle the notifications.

import requests
from superset.reports.notifications.base import BaseNotification
from superset.reports.models import ReportRecipientType

class WebhookNotification(BaseNotification):
    type = ReportRecipientType.WEBHOOK  # Register this notification as 'webhook'

    def send(self) -> None:
        # Get the webhook URL from recipient's data
        webhook_url = self._recipient.recipient_config_json.get("target")

        # Prepare the message content (adjust as per your needs)
        payload = {
            "alert_name": self._content.name or "New Superset Alert",
            "text": self._content.text or "Alert Triggered",
            "description": self._content.description,
            "url": self._content.url,
            "timestamp": self._content.timestamp.isoformat() if self._content.timestamp else "",
            # Add any additional fields you need
        }

        if webhook_url:
            headers = {'Content-Type': 'application/json'}
            try:
                response = requests.post(webhook_url, json=payload, headers=headers)
                response.raise_for_status()
                self.logger.info(f"Successfully sent alert to Webhook: {response.status_code}")
            except requests.exceptions.RequestException as e:
                self.logger.error(f"Failed to send alert to Webhook: {e}")
        else:
            self.logger.error("Webhook URL not found in recipient configuration")
            raise ValueError("Webhook URL not found in recipient configuration")
`

Alert Configuration: I’ve set up alerts that should trigger based on certain conditions in my database.

image

Basically, I created this alert from the REST API - http://0.0.0.0:8088/api/v1/report/ (for creating alerts) with the payload -

{
"description_columns": {},
"id": 2,
"label_columns": {
"active": "Active",
"chart.id": "Chart Id",
"chart.slice_name": "Chart Slice Name",
"chart.viz_type": "Chart Viz Type",
"context_markdown": "Context Markdown",
"creation_method": "Creation Method",
"crontab": "Crontab",
"dashboard.dashboard_title": "Dashboard Dashboard Title",
"dashboard.id": "Dashboard Id",
"database.database_name": "Database Database Name",
"database.id": "Database Id",
"description": "Description",
"extra": "Extra",
"force_screenshot": "Force Screenshot",
"grace_period": "Grace Period",
"id": "Id",
"last_eval_dttm": "Last Eval Dttm",
"last_state": "Last State",
"last_value": "Last Value",
"last_value_row_json": "Last Value Row Json",
"log_retention": "Log Retention",
"name": "Name",
"owners.first_name": "Owners First Name",
"owners.id": "Owners Id",
"owners.last_name": "Owners Last Name",
"recipients.id": "Recipients Id",
"recipients.recipient_config_json": "Recipients Recipient Config Json",
"recipients.type": "Recipients Type",
"report_format": "Report Format",
"sql": "Sql",
"timezone": "Timezone",
"type": "Type",
"validator_config_json": "Validator Config Json",
"validator_type": "Validator Type",
"working_timeout": "Working Timeout"
},
"result": {
"active": true,
"chart": null,
"context_markdown": "string",
"creation_method": "alerts_reports",
"crontab": "* * * * *",
"dashboard": {
"dashboard_title": "Zabbix Alarms",
"id": 15
},
"database": {
"database_name": "PostgreSQL",
"id": 2
},
"description": "Webhook Alert notification",
"extra": {},
"force_screenshot": false,
"grace_period": 14400,
"id": 2,
"last_eval_dttm": null,
"last_state": "Not triggered",
"last_value": null,
"last_value_row_json": null,
"log_retention": 90,
"name": "Webhook Alert",
"owners": [
{
"first_name": "admin",
"id": 1,
"last_name": "user"
}
],
"recipients": [
{
"id": 36,
"recipient_config_json": "{"target": "https://webhook.site/d98c5948-3a9f-4cca-ab27-f251033e6956"}",
"type": "Webhook"
}
],
"report_format": "PNG",
"sql": "SELECT "value_column" \r\nFROM inodm.dummy_table\r\nWHERE "insertion_timestamp" = (SELECT MAX("insertion_timestamp") FROM inodm.dummy_table)\r\n\r\n\r\n",
"timezone": "Asia/Kolkata",
"type": "Alert",
"validator_config_json": "{"op": ">", "threshold": 20.0}",
"validator_type": "operator",
"working_timeout": 3600
},
"show_columns": [
"id",
"active",
"chart.id",
"chart.slice_name",
"chart.viz_type",
"context_markdown",
"creation_method",
"crontab",
"dashboard.dashboard_title",
"dashboard.id",
"database.database_name",
"database.id",
"description",
"extra",
"force_screenshot",
"grace_period",
"last_eval_dttm",
"last_state",
"last_value",
"last_value_row_json",
"log_retention",
"name",
"owners.first_name",
"owners.id",
"owners.last_name",
"recipients.id",
"recipients.recipient_config_json",
"recipients.type",
"report_format",
"sql",
"timezone",
"type",
"validator_config_json",
"validator_type",
"working_timeout"
],
"show_title": "Show Report Schedule"
}

Database Testing: I’ve mocked the data by adding records to the database and verified that the alerts should be triggered when the conditions are met.

Issues Encountered:
Despite being able to successfully send requests to the webhook endpoint using Postman, I'm not receiving any alerts from Superset.
I have both Celery and Redis running to handle background tasks, but it seems the alert mechanism is not working as expected.

superset/reports/models.py

`import enum

from cron_descriptor import get_description
from flask_appbuilder import Model
from flask_appbuilder.models.decorators import renders
from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    Float,
    ForeignKey,
    Integer,
    String,
    Table,
    Text,
)
from sqlalchemy.orm import backref, relationship
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy_utils import UUIDType

from superset.extensions import security_manager
from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.helpers import AuditMixinNullable, ExtraJSONMixin
from superset.models.slice import Slice
from superset.reports.types import ReportScheduleExtra

metadata = Model.metadata  # pylint: disable=no-member


class ReportScheduleType(str, enum.Enum):
    ALERT = "Alert"
    REPORT = "Report"


class ReportScheduleValidatorType(str, enum.Enum):
    """Validator types for alerts"""

    NOT_NULL = "not null"
    OPERATOR = "operator"


class ReportRecipientType(str, enum.Enum):
 EMAIL = "Email"
    SLACK = "Slack"
    WEBHOOK = "Webhook"  # New type for REST API notifications


class ReportState(str, enum.Enum):
    SUCCESS = "Success"
    WORKING = "Working"
    ERROR = "Error"
    NOOP = "Not triggered"
    GRACE = "On Grace"


class ReportDataFormat(str, enum.Enum):
    VISUALIZATION = "PNG"
    DATA = "CSV"
    TEXT = "TEXT"


class ReportCreationMethod(str, enum.Enum):
    CHARTS = "charts"
    DASHBOARDS = "dashboards"
    ALERTS_REPORTS = "alerts_reports"


class ReportSourceFormat(str, enum.Enum):
    CHART = "chart"
    DASHBOARD = "dashboard"


report_schedule_user = Table(
    "report_schedule_user",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("user_id", Integer, ForeignKey("ab_user.id"), nullable=False),
    Column(
        "report_schedule_id", Integer, ForeignKey("report_schedule.id"), nullable=False
    ),
    UniqueConstraint("user_id", "report_schedule_id"),
)


class ReportSchedule(Model, AuditMixinNullable, ExtraJSONMixin):
"""
    Report Schedules, supports alerts and reports
    """

    __tablename__ = "report_schedule"
    __table_args__ = (UniqueConstraint("name", "type"),)

    id = Column(Integer, primary_key=True)
    type = Column(String(50), nullable=False)
    name = Column(String(150), nullable=False)
    description = Column(Text)
    context_markdown = Column(Text)
    active = Column(Boolean, default=True, index=True)
    crontab = Column(String(1000), nullable=False)
    creation_method = Column(
        String(255), server_default=ReportCreationMethod.ALERTS_REPORTS
    )
    timezone = Column(String(100), default="UTC", nullable=False)
    report_format = Column(String(50), default=ReportDataFormat.VISUALIZATION)
    sql = Column(Text())
    # (Alerts/Reports) M-O to chart
    chart_id = Column(Integer, ForeignKey("slices.id"), nullable=True)
    chart = relationship(Slice, backref="report_schedules", foreign_keys=[chart_id])
    # (Alerts/Reports) M-O to dashboard
    dashboard_id = Column(Integer, ForeignKey("dashboards.id"), nullable=True)
    dashboard = relationship(
        Dashboard, backref="report_schedules", foreign_keys=[dashboard_id]
    )
    # (Alerts) M-O to database
    database_id = Column(Integer, ForeignKey("dbs.id"), nullable=True)
    database = relationship(Database, foreign_keys=[database_id])
    owners = relationship(security_manager.user_model, secondary=report_schedule_user)

    # (Alerts) Stamped last observations
    last_eval_dttm = Column(DateTime)
    last_state = Column(String(50), default=ReportState.NOOP)
    last_value = Column(Float)
    last_value_row_json = Column(Text)

    # (Alerts) Observed value validation related columns
    validator_type = Column(String(100))
    validator_config_json = Column(Text, default="{}")
# Log retention
    log_retention = Column(Integer, default=90)
    # (Alerts) After a success how long to wait for a new trigger (seconds)
    grace_period = Column(Integer, default=60 * 60 * 4)
    # (Alerts/Reports) Unlock a possible stalled working state
    working_timeout = Column(Integer, default=60 * 60 * 1)

    # (Reports) When generating a screenshot, bypass the cache?
    force_screenshot = Column(Boolean, default=False)

    extra: ReportScheduleExtra  # type: ignore

    def __repr__(self) -> str:
        return str(self.name)

    @renders("crontab")
    def crontab_humanized(self) -> str:
        return get_description(self.crontab)


class ReportRecipients(Model, AuditMixinNullable):
    """
    Report Recipients, meant to support multiple notification types, eg: Slack, email
    """

    __tablename__ = "report_recipient"
    id = Column(Integer, primary_key=True)
    type = Column(String(50), nullable=False)
    recipient_config_json = Column(Text, default="{}")
    report_schedule_id = Column(
        Integer, ForeignKey("report_schedule.id"), nullable=False
    )
    report_schedule = relationship(
        ReportSchedule,
        backref=backref("recipients", cascade="all,delete,delete-orphan"),
        foreign_keys=[report_schedule_id],
    )
class ReportExecutionLog(Model):  # pylint: disable=too-few-public-methods

    """
    Report Execution Log, hold the result of the report execution with timestamps,
    last observation and possible error messages
    """

    __tablename__ = "report_execution_log"
    id = Column(Integer, primary_key=True)
    uuid = Column(UUIDType(binary=True))

    # Timestamps
    scheduled_dttm = Column(DateTime, nullable=False)
    start_dttm = Column(DateTime)
    end_dttm = Column(DateTime)

    # (Alerts) Observed values
    value = Column(Float)
    value_row_json = Column(Text)

    state = Column(String(50), nullable=False)
    error_message = Column(Text)

    report_schedule_id = Column(
        Integer, ForeignKey("report_schedule.id"), nullable=False
    )
    report_schedule = relationship(
        ReportSchedule,
        backref=backref("logs", cascade="all,delete,delete-orphan"),
        foreign_keys=[report_schedule_id],
    )

superset/reports/notifications/Base.py

from dataclasses import dataclass
from typing import Any, List, Optional, Type

import pandas as pd

from superset.reports.models import ReportRecipients, ReportRecipientType
from superset.utils.core import HeaderDataType


@dataclass
class NotificationContent:
    name: str
    header_data: HeaderDataType  # this is optional to account for error states
    csv: Optional[bytes] = None  # bytes for csv file
    screenshots: Optional[List[bytes]] = None  # bytes for a list of screenshots
    text: Optional[str] = None
    description: Optional[str] = ""
    url: Optional[str] = None  # url to chart/dashboard for this screenshot
    embedded_data: Optional[pd.DataFrame] = None


class BaseNotification:  # pylint: disable=too-few-public-methods
    """
    Serves has base for all notifications and creates a simple plugin system
    for extending future implementations.
    Child implementations get automatically registered and should identify the
    notification type
    """

    plugins: List[Type["BaseNotification"]] = []
    type: Optional[ReportRecipientType] = None
    """
    Child classes set their notification type ex: `type = "email"` this string will be
    used by ReportRecipients.type to map to the correct implementation
    """

    def __init_subclass__(cls, *args: Any, **kwargs: Any) -> None:
        super().__init_subclass__(*args, **kwargs)
        cls.plugins.append(cls)

    def __init__(
        self, recipient: ReportRecipients, content: NotificationContent
    ) -> None:
self._recipient = recipient
        self._content = content

    def send(self) -> None:
        raise NotImplementedError()

Superset_Config.py

`from celery.schedules import crontab
import logging
from logging.handlers import RotatingFileHandler
from superset.tasks.types import ExecutorType

# Superset specific config
ROW_LIMIT = 5000

# Flask App Builder configuration
# Your App secret key will be used for securely signing the session cookie
# and encrypting sensitive information on the database
# Make sure you are changing this key for your deployment with a strong key.
# Alternatively you can set it with `SUPERSET_SECRET_KEY` environment variable.
# You MUST set this for production environments or the server will not refuse
# to start and you will see an error in the logs accordingly.
SECRET_KEY = '*****************************************************************'

# The SQLAlchemy connection string to your database backend
# This connection defines the path to the database that stores your
# superset metadata (slices, connections, tables, dashboards, ...).
# Note that the connection information to connect to the datasources
# you want to explore are managed directly in the web UI
# The check_same_thread=false property ensures the sqlite client does not attempt
# to enforce single-threaded access, which may be problematic in some edge cases
SQLALCHEMY_DATABASE_URI = 'sqlite:////app/superset/superset.db?check_same_thread=false'

TALISMAN_ENABLED = False
WTF_CSRF_ENABLED = False

# Set this API key to enable Mapbox visualizations
MAPBOX_API_KEY = ''
FEATURE_FLAGS = {
    "ALERT_REPORTS": True,
    "DASHBOARD_CROSS_FILTERS": True,
    "DASHBOARD_VIRTUALIZATION": True,
    "EMBEDDED_SUPERSET": False,
    "ALERT_REPORT_TABS": False,
    "ALERT_REPORT_SLACK_V2": False,
    "ENABLE_ADVANCED_DATA_TYPES": False,
    "ALERTS_ATTACH_REPORTS": True,
    "ALLOW_FULL_CSV_EXPORT": False,
    "ALLOW_ADHOC_SUBQUERY": False,
    "EMBEDDABLE_CHARTS": True,
    "DRILL_TO_DETAIL": True,
    "DRILL_BY": True,
    "HORIZONTAL_FILTER_BAR": True,
    "ENABLE_SCHEDULED_REPORTS": True,
    "ALERT_REPORTS_NOTIFICATION_DRY_RUN": False,
    "DATAPANEL_CLOSED_BY_DEFAULT": False,
}


REDIS_HOST = "localhost"
REDIS_PORT = "6379"

class CeleryConfig:
    broker_url = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
    broker_connection_retry_on_startup = True
    imports = (
        "superset.sql_lab",
        "superset.tasks.scheduler",
    )
    result_backend = f"redis://{REDIS_HOST}:{REDIS_PORT}/0"
    worker_prefetch_multiplier = 10
    task_acks_late = True
    task_annotations = {
        "sql_lab.get_sql_results": {
            "rate_limit": "100/s",
        },
    }
    beat_schedule = {
        "reports.scheduler": {
            "task": "reports.scheduler",
 "schedule": crontab(minute="*", hour="*"),
        },
        "reports.prune_log": {
            "task": "reports.prune_log",
            "schedule": crontab(minute=0, hour=0),
        },
    }
CELERY_CONFIG = CeleryConfig

ALERT_NOTIFICATION_METHODS = {
    'Webhook': 'superset.reports.notifications.webhook.WebhookNotification',
}

WEBDRIVER_TYPE = "chrome"
WEBDRIVER_OPTION_ARGS = [
    "--force-device-scale-factor=2.0",
    "--high-dpi-support=2.0",
    "--headless",
    "--disable-gpu",
    "--disable-dev-shm-usage",
    "--no-sandbox",
    "--disable-setuid-sandbox",
    "--disable-extensions",
]

WEBDRIVER_BASEURL = "http://localhost:8088"
WEBDRIVER_BASEURL_USER_FRIENDLY = "http://localhost:8088"

THUMBNAIL_SELENIUM_USER = 'admin'
ALERT_REPORTS_EXECUTE_AS = [ExecutorType.SELENIUM]

# Define the log file location and settings
LOG_FORMAT = '%(asctime)s:%(levelname)s:%(name)s:%(message)s'

logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("superset_config.py loaded successfully.")

# Enable file logging
LOG_FILE = '/var/log/superset/superset.log'  # Change the path as needed

file_handler = RotatingFileHandler(LOG_FILE, maxBytes=10000000, backupCount=5)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT))

logging.getLogger().addHandler(file_handler)

Celery and Redis are running correctly -

image

Request for Help:

I would appreciate any insights or suggestions on troubleshooting this issue.
Specifically, I want to confirm that my implementation is aligned with the correct alerting process in Superset.
If anyone has experience with similar custom notifications, your advice would be invaluable.
Thank you in advance for your assistance!

@fisjac @kistoth90 @michael-s-molina @dpgaspar

@andy1xx8
Copy link

big vote for this feature

@Bindu-yadav8
Copy link
Author

Hello Community,

Any update on the above request.

@qingshan007
Copy link

Your ideas are great. The feature I'm developing happens to include what you mentioned. Webhook is quite good because the integration of enterprise-level instant messaging is too scattered.

Function: At the same time, the alarm data source will be passed to the finely tuned large model, the analysis report will be generated and pushed synchronously after adding hyperlinks. The expected method is to use the salted symmetric SHA3 algorithm to encrypt private data and ensure data security. Currently, this feature is still in the development stage. If it is completed, I will contribute.

Version↓ :
python : 3.9
superset : 3.0.3

image

@ilsaloving
Copy link

ilsaloving commented Oct 17, 2024

One extremely useful feature would be if, instead of constructing the webhook payload directly in python, you allowed users to provide a Jinja template. Then you could feed the alert results into that template. That would be much more versatile than having people maintain custom python libraries.

Also, maybe tap into the existing alerts UI instead of making your own?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
alert-reports Namespace | Anything related to the Alert & Reports feature change:backend Requires changing the backend
6 participants