Papermill

Apache Airflow supports integration with Papermill. Papermill is a tool for parameterizing and executing Jupyter Notebooks. Perhaps you have a financial report that you wish to run with different values on the first or last day of a month or at the beginning or end of the year. Using parameters in your notebook and using the PapermillOperator makes this a breeze.

Usage

Creating a notebook

To parameterize your notebook designate a cell with the tag parameters. Papermill looks for the parameters cell and treats this cell as defaults for the parameters passed in at execution time. Papermill will add a new cell tagged with injected-parameters with input parameters in order to overwrite the values in parameters. If no cell is tagged with parameters the injected cell will be inserted at the top of the notebook.

Make sure that you save your notebook somewhere so that Airflow can access it. Papermill supports S3, GCS, Azure and Local. HDFS is not supported.

Example DAG

Use the PapermillOperator to execute a jupyter notebook:

tests/system/papermill/example_papermill.py[source]

run_this = PapermillOperator(
    task_id="run_example_notebook",
    input_nb="/tmp/hello_world.ipynb",
    output_nb="/tmp/out-{{ execution_date }}.ipynb",
    parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)

Example DAG to Verify the message in the notebook:

tests/system/papermill/example_papermill_verify.py[source]

@task
def check_notebook(inlets, logical_date):
    """
    Verify the message in the notebook
    """
    notebook = sb.read_notebook(inlets[0].url)
    message = notebook.scraps["message"]
    print(f"Message in notebook {message} for {logical_date}")

    if message.data != f"Ran from Airflow at {logical_date}!":
        return False

    return True


with DAG(
    dag_id="example_papermill_operator_verify",
    schedule=SCHEDULE_INTERVAL,
    start_date=START_DATE,
    dagrun_timeout=DAGRUN_TIMEOUT,
    catchup=False,
) as dag:
    run_this = PapermillOperator(
        task_id="run_example_notebook",
        input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
        output_nb="/tmp/out-{{ logical_date }}.ipynb",
        parameters={"msgs": "Ran from Airflow at {{ logical_date }}!"},
    )

    run_this >> check_notebook(inlets=AUTO, logical_date="{{ logical_date }}")

Example DAG to Verify the message in the notebook using a remote jupyter kernel:

tests/system/papermill/example_papermill_remote_verify.py[source]

@task
def check_notebook(output_notebook, execution_date):
    """
    Verify the message in the notebook
    """
    notebook = sb.read_notebook(output_notebook)
    message = notebook.scraps["message"]
    print(f"Message in notebook {message} for {execution_date}")

    if message.data != f"Ran from Airflow at {execution_date}!":
        return False

    return True


with DAG(
    dag_id="example_papermill_operator_remote_verify",
    schedule="@once",
    start_date=START_DATE,
    dagrun_timeout=DAGRUN_TIMEOUT,
    catchup=False,
) as dag:
    run_this = PapermillOperator(
        task_id="run_example_notebook",
        input_nb=os.path.join(os.path.dirname(os.path.realpath(__file__)), "input_notebook.ipynb"),
        output_nb="/tmp/out-{{ execution_date }}.ipynb",
        parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
        kernel_conn_id="jupyter_kernel_default",
    )

    run_this >> check_notebook(
        output_notebook="/tmp/out-{{ execution_date }}.ipynb", execution_date="{{ execution_date }}"
    )

Was this entry helpful?