[`airflow`] Update `AIR302` to check for deprecated context keys (#15144)
**Summary**
Airflow 3.0 removes a set of deprecated context variables that were
phased out in 2.x. This PR introduces lint rules to detect usage of
these removed variables in various patterns, helping identify
incompatibilities. The removed context variables include:
```
conf
execution_date
next_ds
next_ds_nodash
next_execution_date
prev_ds
prev_ds_nodash
prev_execution_date
prev_execution_date_success
tomorrow_ds
yesterday_ds
yesterday_ds_nodash
```
**Detected Patterns and Examples**
The linter now flags the use of removed context variables in the
following scenarios:
1. **Direct Subscript Access**
```python
execution_date = context["execution_date"] # Flagged
```
2. **`.get("key")` Method Calls**
```python
print(context.get("execution_date")) # Flagged
```
3. **Variables Assigned from `get_current_context()`**
If a variable is assigned from `get_current_context()` and then used to
access a removed key:
```python
c = get_current_context()
print(c.get("execution_date")) # Flagged
```
4. **Function Parameters in `@task`-Decorated Functions**
Parameters named after removed context variables in functions decorated
with `@task` are flagged:
```python
from airflow.decorators import task
@task
def my_task(execution_date, **kwargs): # Parameter 'execution_date'
flagged
pass
```
5. **Removed Keys in Task Decorator `kwargs` and Other Scenarios**
Other similar patterns where removed context variables appear (e.g., as
part of `kwargs` in a `@task` function) are also detected.
```
from airflow.decorators import task
@task
def process_with_execution_date(**context):
execution_date = lambda: context["execution_date"] # flagged
print(execution_date)
@task(kwargs={"execution_date": "2021-01-01"}) # flagged
def task_with_kwargs(**context):
pass
```
**Test Plan**
Test fixtures covering various patterns of deprecated context usage are
included in this PR. For example:
```python
from airflow.decorators import task, dag, get_current_context
from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
import pendulum
from datetime import datetime
@task
def access_invalid_key_task(**context):
print(context.get("conf")) # 'conf' flagged
@task
def print_config(**context):
execution_date = context["execution_date"] # Flagged
prev_ds = context["prev_ds"] # Flagged
@task
def from_current_context():
context = get_current_context()
print(context["execution_date"]) # Flagged
# Usage outside of a task decorated function
c = get_current_context()
print(c.get("execution_date")) # Flagged
@task
def some_task(execution_date, **kwargs):
print("execution date", execution_date) # Parameter flagged
@dag(
start_date=pendulum.datetime(2021, 1, 1, tz="UTC")
)
def my_dag():
task1 = DummyOperator(
task_id="task1",
params={
"execution_date": "{{ execution_date }}", # Flagged in template context
},
)
access_invalid_key_task()
print_config()
from_current_context()
dag = my_dag()
class CustomOperator(BaseOperator):
def execute(self, context):
execution_date = context.get("execution_date") # Flagged
next_ds = context.get("next_ds") # Flagged
next_execution_date = context["next_execution_date"] # Flagged
```
Ruff will emit `AIR302` diagnostics for each deprecated usage, with
suggestions when applicable, aiding in code migration to Airflow 3.0.
related: https://github.com/apache/airflow/issues/44409,
https://github.com/apache/airflow/issues/41641
---------
Co-authored-by: Wei Lee <weilee.rx@gmail.com>