Neo4jSensor¶
The Neo4jSensor executes a Cypher query
in a Neo4j database until the returned value satisfies a condition.
The sensor runs the query repeatedly at the defined poke_interval until:
A callable provided in
failureevaluates toTrue, which raises an exception.A callable provided in
successevaluates toTrue, which marks the sensor as successful.Otherwise, the truthiness of the selected value determines success.
The sensor uses Neo4jHook and the
Neo4j Python driver for communication
with the database.
Prerequisites¶
To use the Neo4j sensor:
A Neo4j instance must be reachable from the Airflow environment.
A valid Neo4j connection must be configured in Airflow (for example
neo4j_default), as described in Neo4j Connection.The
neo4jprovider package must be installed in your Airflow environment.
Basic Usage¶
The simplest use case is to run a Cypher query and rely on the first value of the first row to determine success. Any truthy value will mark the sensor as successful.
Example: Wait for at least one Person node to exist:
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.neo4j.sensors.neo4j import Neo4jSensor
with DAG(
dag_id="example_neo4j_sensor_basic",
start_date=days_ago(1),
schedule=None,
catchup=False,
):
wait_person_exists = Neo4jSensor(
task_id="wait_person_exists",
neo4j_conn_id="neo4j_default",
cypher="MATCH (p:Person) RETURN count(p) > 0",
)
In this example, the Cypher query returns a single boolean value. When it becomes
True, the sensor succeeds.
Templating¶
The following fields of Neo4jSensor
are templated:
cypherparameters
This allows you to build dynamic queries using Jinja templates based on the execution context.
Example: Use execution date in the query:
wait_events_for_date = Neo4jSensor(
task_id="wait_events_for_date",
neo4j_conn_id="neo4j_default",
cypher="""
MATCH (e:Event)
WHERE e.date = $event_date
RETURN count(e) > 0
""",
parameters={"event_date": "{{ ds }}"},
)
Advanced Usage¶
Custom success and failure conditions¶
You can provide callables for success and failure to implement more complex
logic. Each callable receives the selected value and must return a boolean.
Note: failure condition takes priority over the success condition.
success(value)– if provided, the sensor succeeds when this returnsTrue.failure(value)– if provided and returnsTrue, the sensor raisesAirflowException.
Example: Wait until a count reaches at least 10, and fail if it ever exceeds 1,000:
def success_when_at_least_10(value):
return value >= 10
def fail_when_too_large(value):
return value > 1000
wait_count_in_range = Neo4jSensor(
task_id="wait_count_in_range",
neo4j_conn_id="neo4j_default",
cypher="MATCH (n:Item) RETURN count(n)",
success=success_when_at_least_10,
failure=fail_when_too_large,
)
Using a custom selector¶
By default, the sensor applies operator.itemgetter(0) to the first row of the
result set, effectively selecting the first value of the first row. You can override
this with a custom selector callable.
The selector receives the first row as a tuple and must return a single value
to be used by success / failure or for truthiness evaluation.
Example: Select a specific column from the row:
from operator import itemgetter
# Assume the query returns rows of the form (count, status)
wait_status_ok = Neo4jSensor(
task_id="wait_status_ok",
neo4j_conn_id="neo4j_default",
cypher="MATCH (s:ServiceStatus) RETURN s.count, s.status ORDER BY s.timestamp DESC LIMIT 1",
selector=itemgetter(1), # pick the 'status' column
success=lambda status: status == "OK",
)
Handling empty results¶
If the query returns no rows:
When
fail_on_empty=False(default), the sensor simply returnsFalseand will be re-scheduled for the next poke.When
fail_on_empty=True, the sensor raisesAirflowException.
Example: Fail immediately if there are no results:
wait_non_empty = Neo4jSensor(
task_id="wait_non_empty",
neo4j_conn_id="neo4j_default",
cypher="MATCH (o:Order) RETURN o.id LIMIT 1",
fail_on_empty=True,
)
Reference¶
Parameters¶
neo4j_conn_idConnection ID to use for connecting to Neo4j. Defaults to
"neo4j_default".cypherCypher statement to execute. This field is templated.
parametersDictionary of query parameters passed to the Cypher statement. This field is templated.
successOptional callable that receives the selected value and returns a boolean. When provided, the sensor succeeds only when this callable returns
True.failureOptional callable that receives the selected value. If provided and it returns
True, the sensor raisesAirflowException`.selectorCallable that receives the first row of the query result as a tuple and returns a single value to be evaluated. Defaults to selecting the first element of the row with
operator.itemgetter(0).fail_on_emptyWhen set to
True, the sensor raises an exception if the query returns no rows. WhenFalse(default), the sensor simply returnsFalseand will poke again later.**kwargsAdditional keyword arguments passed to
BaseSensorOperator, such aspoke_interval,timeout, ormode.