跳至主要內容

Connection


Connection

介绍

在Airflow中,Connection是指用于连接到外部系统或服务的信息。这些外部系统或服务可以是数据库、API、FTP服务器等等。

Connection包括以下信息:

  • Connection ID:用于在Airflow中唯一标识该Connection的字符串。
  • Connection Type:Connection的类型,例如数据库、SSH、FTP等。
  • Host:连接的主机名或IP地址。
  • Schema:连接的数据库模式或名称。
  • Login:连接的用户名。
  • Password:连接的密码。
  • Port:连接的端口号。
  • Extra:其他的连接参数,例如SSL配置、OAuth令牌等。

在Airflow中,Connection可以在Web UI中进行配置,也可以在DAG代码中使用Python代码进行配置。在DAG中,可以使用Connection来连接到外部系统,例如从数据库中提取数据或向API发送请求。Connection的信息可以在DAG中使用Hook对象来访问,并且可以在DAG代码中进行参数化,以便在不同的环境中使用不同的Connection信息。

举例

一个常见的Connection的例子是连接到MySQL数据库。在Airflow中,可以在Web UI中配置一个MySQL Connection,或者在DAG代码中使用Python代码进行配置,例如:

from airflow import DAG
from airflow.providers.mysql.hooks.mysql import MySqlHook
from datetime import datetime

dag = DAG(
    'my_dag',
    start_date=datetime(2023, 6, 24),
    schedule_interval='0 0 * * *'
)

mysql_conn_id = 'my_mysql_conn'

mysql_hook = MySqlHook(mysql_conn_id)

query = 'SELECT * FROM my_table'

result = mysql_hook.get_records(query)

for row in result:
    print(row)

在这个例子中,我们首先定义了一个DAG,然后使用MySqlHook来连接到一个MySQL数据库。mysql_conn_id是在Airflow中定义的MySQL Connection的ID。使用MySqlHookget_records方法,我们可以执行一个SQL查询,并将结果存储在result变量中。最后,我们可以遍历结果并进行处理。这个例子中,我们只是简单地打印每一行的内容。

需要注意的是,这个例子中的Connection信息是在DAG代码中硬编码的。在实际情况中,为了更好的可维护性和可重用性,我们通常会将Connection信息配置在Airflow的配置文件中,然后在DAG代码中使用参数化的方式来访问。

上次编辑于:
贡献者: Neil