|
| 1 | +(nifi-tutorial)= |
| 2 | +# Connecting to CrateDB from Apache NiFi |
| 3 | + |
| 4 | +This article describes how to connect from [Apache NiFi](http://nifi.apache.org) to CrateDB and ingest data from NiFi into CrateDB. |
| 5 | + |
| 6 | +## Prerequisites |
| 7 | +To follow this article, you will need: |
| 8 | +* A CrateDB cluster |
| 9 | +* An Apache NiFi installation that can connect to the CrateDB cluster |
| 10 | + |
| 11 | +## Configure |
| 12 | +First, we will set up a connection pool to CrateDB: |
| 13 | + 1. On the main NiFi web interface, click the gear icon of your process group ("NiFi Flow" by default). |
| 14 | + 2. Switch to "Controller Services" and click the plus icon to add a new controller. |
| 15 | + 3. Choose "DBCPConnectionPool" as type and click "Add". |
| 16 | + 4. Open the settings of the newly created connection pool and switch to "Properties". The table below describes in more detail which parameters need to be changed. |
| 17 | + |
| 18 | +| Parameter | Description | Sample value | |
| 19 | +| -------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------------------------------------- | |
| 20 | +| Database Connection URL | The JDBC connection string pointing to CrateDB | `jdbc:postgresql://<CrateDB host>:5432/doc?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory` | |
| 21 | +| Database Driver Class Name | The PostgreSQL JDBC driver class name | `org.postgresql.Driver` | |
| 22 | +| Database Driver Location(s)| [Download](https://jdbc.postgresql.org/download/) the latest PostgreSQL JDBC driver and place it on the file system of the NiFi host | `/opt/nifi/nifi-1.13.2/postgresql-42.2.23.jar` | |
| 23 | +| Database User | The CrateDB user name | | |
| 24 | +| Password | The password of your CrateDB user | | |
| 25 | + |
| 26 | + 5. After applying the changed properties, click the flash icon to enable the service. |
| 27 | + |
| 28 | +Now the connection pool is ready to be used in one of NiFi's processors. |
| 29 | + |
| 30 | +## Example: Read from CSV files |
| 31 | +One common use case is to design a process in NiFi that results in data being ingested into CrateDB. As an example, we will take a CSV file from the [NYC Taxi Data](https://github.com/toddwschneider/nyc-taxi-data) repository, process it in NiFi, and then ingest it into Crate DB. |
| 32 | + |
| 33 | +To achieve high throughput, NiFi uses by default prepared statements with configurable batch size. The optimal batch size depends on your concrete use case, 500 is typically a good starting point. Please also see the documentation on [insert performance](https://crate.io/docs/crate/howtos/en/latest/performance/inserts/index.html) for additional information. |
| 34 | + |
| 35 | +{height=480} |
| 36 | + |
| 37 | +In CrateDB, we first create the corresponding target table: |
| 38 | + |
| 39 | +```sql |
| 40 | +CREATE TABLE "doc"."yellow_taxi_trips" ( |
| 41 | + "vendor_id" TEXT, |
| 42 | + "pickup_datetime" TIMESTAMP WITH TIME ZONE, |
| 43 | + "dropoff_datetime" TIMESTAMP WITH TIME ZONE, |
| 44 | + "passenger_count" INTEGER, |
| 45 | + "trip_distance" REAL, |
| 46 | + "pickup_longitude" REAL, |
| 47 | + "pickup_latitude" REAL, |
| 48 | + "rate_code" INTEGER, |
| 49 | + "store_and_fwd_flag" TEXT, |
| 50 | + "dropoff_longitude" REAL, |
| 51 | + "dropoff_latitude" REAL, |
| 52 | + "payment_type" TEXT, |
| 53 | + "fare_amount" REAL, |
| 54 | + "surcharge" REAL, |
| 55 | + "mta_tax" REAL, |
| 56 | + "tip_amount" REAL, |
| 57 | + "tolls_amount" REAL, |
| 58 | + "total_amount" REAL |
| 59 | +); |
| 60 | +``` |
| 61 | + |
| 62 | +After configuring the processors as described below, click the start icon on the process group window. You should see rows appearing in CrateDB after a short amount of time. If you encounter any issues, please also check NiFi's log files (`log/nifi-bootstrap.log` and `log/nifi-app.log`). |
| 63 | + |
| 64 | +### GetFile |
| 65 | +The `GetFile` processor points to a local directory that contains the file [yellow_tripdata_2013-08.csv](https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2013-08.csv). |
| 66 | + |
| 67 | +### PutDatabaseRecord |
| 68 | +The PutDatabaseRecord has a couple of properties that need to be configured: |
| 69 | +* Record Reader: CSVReader. The CSVReader is configured to use "Use String Fields From Header" as a "Schema Access Strategy". |
| 70 | +* Database Type: PostgreSQL |
| 71 | +* Statement Type: INSERT |
| 72 | +* Database Connection Pooling Service: The connection pool created previously |
| 73 | +* Schema Name: `doc` |
| 74 | +* Table Name: `yellow_taxi_trips` |
| 75 | +* Maximum Batch Size: 200 |
| 76 | + |
| 77 | +## Example: Read from another SQL-based database |
| 78 | +Data can be also be read from a SQL database and then be inserted into CrateDB: |
| 79 | + |
| 80 | +### ExecuteSQLRecord |
| 81 | +Reads rows from the source database. |
| 82 | +* Database Connection Pooling Service: A connection pool pointing to the source database |
| 83 | +* SQL select query: The SQL query to retrieve rows as needed |
| 84 | +* RecordWriter: JsonRecordSetWriter. JSON files are required by the following processors for conversion into SQL statements. |
| 85 | + |
| 86 | +### ConvertJSONToSQL |
| 87 | +Converts the generated JSON files into SQL statements. |
| 88 | +* JDBC Connection Pool: A connection pool pointing to CrateDB |
| 89 | +* Statement Type: INSERT |
| 90 | +* Table Name: Name of the target table in CrateDB (without schema name) |
| 91 | +* Schema Name: The table's schema name in CrateDB |
| 92 | + |
| 93 | +### PutSQL |
| 94 | +Executes the previously generated SQL statements as prepared statements. |
| 95 | +* JDBC Connection Pool: A connection pool pointing to CrateDB |
| 96 | +* SQL Statement: No value set |
| 97 | +* Batch Size: 500 (the optimal value for your use case might vary) |
0 commit comments