# Apache Airflow

For researchers who require scheduled workflows, Nuvolos supports Airflow as a self-service application. Airflow runs inside a JupyterLab application, making it easy to edit Airflow DAG files, install packages and use the Nuvolos filesystem for data processing.

The JupyterLab application is collaborative, so DAGs can be worked on simultaneously by multiple users in a "Google Docs"-like fashion.

### Configuration

DAGs should be created as Python files in the `/files/airflow/dags` folder, [refer to Airflow documentation](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html) for an example.

#### Setting up your first DAG

1. Create a new Python file named `/files/airflow/dags/tutorial.py` and copy the contents of the tutorial DAG from [the Airflow tutorial](https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#example-pipeline-definition).
2. Click on the Airflow tab and click on the All DAGs filter selector on the UI, the DAG should show up on the list like on the screenshot below. It can take up to a minute for the DAG to show up on the list, as Airflow is periodically scanning Python files the `/files/airflow/dags` folder for new DAG definitions.
3. Click on the slider toggle next to the `tutorial` DAG name to enable the DAG and start the first execution.
4. You should quickly see that the DAG has executed successfully by seeing a *1* in a green circle in the *Runs* column.

Airlfow Connections and Variables can be configured on the Airflow UI.

Airflow on Nuvolos uses a CeleryExecutor back-end to be able to execute tasks in parallel.

### Installing packages

To install packages used in DAGs, open a JupyterLab terminal and pip / conda / mamba install the required package. See the [Install a software package](/features/applications/install-a-software-package.md) chapter for detailed instructions.

### Logs

Task execution, scheduler and DAG bag update logs are in `/files/airflow/logs`.

### Saving data to Nuvolos

The following example DAG downloads CSV-style time series data from an API, saves it as a compressed Parquet file, and uploads it as a Nuvolos table. Airflow uses the database credentials of the user who started the application.

#### Prerequisites

1. Create and start a new Airflow application in your working instance.
2. Open a new terminal tab and install the required packages:
   1. `mamba install -y --freeze-installed -c conda-forge pandas-datareader`
   2. `mamba install -y --freeze-installed -c conda-forge pyarrow`
3. Save the example script as `/files/airflow/dags/csv_to_nuvolos`. After saving the file, wait a few seconds for the new DAG to appear in the Airflow tab.

```python
""" Example DAG to demonstrate how to download a time series as a CSV file, 
convert it to Parquet then upload it to Nuvolos. """
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable


def export_to_parquet(series, start, end):
    """ Downloads a time-series from St. Luis FRED and exports it as a Parquet file. """
    import pandas_datareader as pdr

    df = pdr.get_data_fred(series, start=datetime.strptime(start, "%Y-%m-%d"), end=datetime.strptime(end, "%Y-%m-%d"))
    df.reset_index(inplace=True)
    df.to_parquet("/files/fred_data.parquet")
    

def upload_data():
    from nuvolos import get_connection, to_sql
    import pandas as pd
    
    df = pd.read_parquet("/files/fred_data.parquet")
    with get_connection() as conn:
        to_sql(df=df, name="fred_data", con=conn, if_exists='replace', index=False)


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'csv_to_nuvolos',
    default_args=default_args,
    description='CSV upload to Nuvolos example DAG',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    
    t1 = PythonOperator(
        task_id='export_to_parquet',
        python_callable=export_to_parquet,
        op_kwargs = {
            "series" : ['TB3MS'],
            "start" : "1934-01-01",
            "end" : "2021-10-01"
        },
    )
    t1.doc_md = """#### FRED data download
Downloads time-series data from FRED and saves them to /files/fred_data.parquet.
"""

    t2 = PythonOperator(
        task_id='upload_to_nuvolos',
        python_callable=upload_data
    )
    t2.doc_md = """#### Data upload to Nuvolos
Uses the [to_sql function](https://docs.nuvolos.cloud/data/upload-data-to-nuvolos#1.-python) of the Nuvolos connector to upload the data as a Nuvolos table.
"""
    t1 >> t2
```

4. Enable the `csv_to_nuvolos` DAG with the **toggle switch** next to its name.
5. Select the **play** button to trigger a DAG run manually.
6. Select the DAG name to open its detail view and monitor task progress. Completed tasks appear in dark green.
7. After the DAG succeeds, open the **Tables** view to verify that the output table was created.

### Airflow with VSCode

Airflow is also available bundled with VSCode, which can make DAG development easier. To use it:

1. In the applications gallery, choose the latest **Airflow + Code-server + Python** application.
2. After the application starts, open the **Command Palette** with `Ctrl + Shift + P` on Windows/Linux or `Command + Shift + P` on macOS.
3. Search for **Airflow** and select **Airflow: Show Airflow**.
4. Airflow opens in a new VSCode tab. Use that tab to view and manage DAGs.
5. To install additional Python dependencies, open a **Terminal** in VSCode and run:

   ```
   mamba install -y -c conda-forge --freeze-installed <package_name>
   ```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.nuvolos.com/user-guides/application-specific-guides/apache-airflow.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
