Skip to content

Kan-T-IT/geonode-provider

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

GeoNode Provider for Apache Airflow

Code style: black License Linkedin

KAN GeoNode Airflow


Contents


About

This provider has the purpose of connecting a GeoNode instance with Apache Airflow, in order to be able to group and encapsulate the most common methods to interact with GeoNode. The user will have the following methods available:

Methods

  • import_layer: runs the "importlayers" command in the GeoNode container to upload the datasets contents in a specific directory/folder to GeoServer.
  • publish_layer: publishes a layer or set of layers uploaded to the database in GeoServer.
  • update_layer: runs the "updatelayers" command in the GeoNode container so that the layer already uploaded, published and recognized by GeoServer, is now visible in GeoNode.
  • upload_dataset: uploads a layer or set of layers in ".zip", ".shp", ".dbf", ".shx", ".prj" or ".tif" format to GeoNode via API v2.
  • upload_style: uploads a style in ".sld" format to GeoServer.
  • update_style: update an existing style.
  • update_layer_style: assigns an already uploaded style to a layer.
  • upload_map: creates a map in GeoNode via API v2 from a set of datasets.
  • upload_document: uploads a document to GeoNode via API v2.
  • upload_metadata: uploads/sets metadata for a given type of resource (for the moment title and supplementary information).
  • set_permiss: sets permissions for a given resource.
  • get_execution_information: gets information/metadata about a performed execution.

Installation

Configuration

To use the GeoNode Hook you will first need to make some configurations:

Connections

In the Airflow UI two connections must be created:

GeoNode connection

It will be necessary to connect Airflow with the destination GeoNode to which you want to work. For this it will be necessary to specify the following parameters related to the GeoNode instance:

  • Connection Id: Connection name in Airflow.
  • Base URL: URL/Host to GeoNode with https or http as appropiate.
  • GeoNode username: GeoNode app username (optional if GeoNode API Token has been placed and GeoServer credentials will be defined bellow).
  • GeoNode password: GeoNode app password (optional if GeoNode API Token has been placed and GeoServer credentials will be defined bellow).
  • GeoNode API Token: It is possible to work in GeoNode by authenticating with an API Token (optional if GeoNode username and password has been placed).
  • GeoServer username: GeoServer app username (Optional. If GeoServer credentials differ from GeoNode credentials you must to specify them. Otherwise, you can specify only the GeoNode credentials).
  • GeoServer password: GeoServer app password (Optional. If GeoServer credentials differ from GeoNode credentials you must to specify them. Otherwise, you can specify only the GeoNode credentials).
  • SSH Host: SSH host where the GeoNode instance is hosted.
  • SSH port: SSH port where the GeoNode instance is hosted.
  • SSH username: SSH username where the GeoNode instance is hosted.
  • SSH password: SSH password where the GeoNode instance is hosted.

GeoNode Connection


Example DAG

A DAG template of how different methods of the GeoNode Provider could be used is provided inside the "example-dag" folder.\

Special attention must be paid to the parameters that each task receives and their order. As well as the way in which the Hook is instantiated and its different methods are called.

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta

from geonode_provider.hooks.geonode import GeoNodeHook


default_args = {
    'owner': 'KAN',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retry_delay': timedelta(minutes=3),
    'email_on_failure': False,
    'email_on_retry': False,
}

geonode_hook = GeoNodeHook(geonode_conn_id="geonode_connection_id_airflow") # Instance GeoNodeHook

with DAG("GeonodeHook-ExampleDAG", default_args=default_args, schedule_interval=None, tags=['Airflow', 'GeoNode']) as dag:

    start = EmptyOperator(
        task_id='start',
        dag=dag
    )

    import_layer_task = PythonOperator(
        task_id='import_layer_task',
        python_callable=geonode_hook.import_layer,
        op_args=[
            "path/to/you/datasets/directory"
        ],
        provide_context=True,
        dag=dag
    )

    publish_layer_task = PythonOperator(
    task_id='publish_layer_task',
    python_callable=geonode_hook.publish_layer,
    op_args=[
        "your_dataset_name"
    ],
    provide_context=True,
    dag=dag
    )

    update_layer_task = PythonOperator(
        task_id='update_layer_task',
        python_callable=geonode_hook.update_layer,
        op_args=[
            "your_dataset_name"
        ],
        provide_context=True,
        dag=dag
    )

    upload_dataset_api = PythonOperator(
        task_id='upload_dataset_api',
        python_callable=geonode_hook.upload_dataset,
        op_args={
            "path/to/you/datasets/directory/or/file"
        },
        provide_context=True,
        dag=dag
    )

    upload_style_task = PythonOperator(
        task_id='upload_style_task',
        python_callable=geonode_hook.upload_style,
        op_args=[
            "path/to/you/sld/style",
            "style_name" # Optional
        ],
        provide_context=True,
        dag=dag
    )

    update_style_task = PythonOperator(
        task_id='update_style_task',
        python_callable=geonode_hook.update_style,
        op_args=[
            "path/to/you/sld/style",
            "style_name" # Optional
        ],
        provide_context=True,
        dag=dag
    )

    update_layer_style_task = PythonOperator(
    task_id='update_layer_style_task',
    python_callable=geonode_hook.update_layer_style,
    op_args=[
        "your_dataset_name",
        "your_style_name.sld"
    ],
    provide_context=True,
    dag=dag
    )

    upload_map = PythonOperator(
        task_id='upload_map',
        python_callable=geonode_hook.upload_map,
        op_args=[
            [dataset_1_id, dataset_2_id, dataset_3_id, etc],
            "Map Title"
        ],
        provide_context=True,
        dag=dag
    )

    upload_document_task = PythonOperator(
        task_id='upload_document_task',
        python_callable=geonode_hook.upload_document,
        op_args=[
            "path/to/your/document"
        ],
        provide_context=True,
        dag=dag
    )

    upload_metadata_task = PythonOperator(
    task_id='upload_metadata_task',
    python_callable=geonode_hook.upload_metadata,
    op_args=[
        "your_resource_type",
        your_resource_id,
        "Resource Title",
        "Resource supplemental information"
    ],
    provide_context=True,
    dag=dag
    )

    set_permiss_task = PythonOperator(
    task_id='set_permiss_task',
    python_callable=geonode_hook.set_permiss,
    op_args=[
        your_resource_id,
        {
            "users": [
                {
                    "id": <user_id>,
                    "permissions": "none"
                },
                {
                    "id": <user_id>,
                    "permissions": "view"
                },
                {
                    "id": <user_id>,
                    "permissions": "download"
                },
                {
                    "id": <user_id>,
                    "permissions": "edit"
                },
                {
                    "id": <user_id>,
                    "permissions": "manage"
                },
                {
                    "id": <user_id>,
                    "permissions": "owner"
                }
            ],
            "groups": [
                {
                    "id": <group_id>,
                    "permissions": "none"
                },
                {
                    "id": <group_id>,
                    "permissions": "none"
                }
            ],
            "organizations": [
                {
                "id": <organization_id>,
                "permissions": "edit"
                }
            ]
        }
    ],
    provide_context=True,
    dag=dag
    )

    get_execution_information_task = PythonOperator(
    task_id='get_execution_information_task',
    python_callable=geonode_hook._get_execution_information,
    op_args=[
        "execution_id"
    ],
    provide_context=True,
    dag=dag
    )

    end = EmptyOperator(
        task_id='end',
        dag=dag
    )


(
start
>> import_layer_task
>> publish_layer_task
>> update_layer_task
>> upload_dataset_api
>> upload_style_task
>> update_layer_style_task
>> upload_map
>> upload_document_task
>> upload_metadata_task
>> set_permiss_task
>> get_execution_information_task
>> end
)


© 2024 KAN Territory & IT. All rights reserved.