A tutorial on how to extend Apache Airflow with custom Operators and Tasks, including examples of how to create and use these custom elements in your workflows.

Apache Airflow is a powerful platform to schedule and orchestrate workflows, and a key part of that power is the ability to extend the core functionality through the use of custom Operators and Tasks.

Custom Operators are classes that define the execution of a specific task, while Tasks are instances of Operators that are executed in a Workflow.

Here is an example of a custom Operator that defines a task to check the status of a website:

from airflow.utils.decorators import apply_defaults
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
 
class CheckWebsiteOperator(BaseOperator):
    @apply_defaults
    def __init__(self, website, *args, **kwargs):
        self.website = website
        super(CheckWebsiteOperator, self).__init__(*args, **kwargs)
 
    def execute(self, context):
        # Check the status of the website
        status = check_website(self.website)
 
        if status != 200:
            raise AirflowException("Website returned status code {}".format(status))
 

To use this custom operator in a workflow, you would create a Task instance and add it to a DAG (Directed Acyclic Graph):

from airflow import DAG
from airflow.operators.python import CheckWebsiteOperator
 
dag = DAG(...)
 
check_website_task = CheckWebsiteOperator(
    task_id='check_website',
    website='https://www.example.com',
    dag=dag
)