ETL: DB to Neo4j

images/download/attachments/8356375/etl_3.png

Load data from SQL database by SQL query, transform every record of result by Groovy (for example remove special characters) then insert every transformed record into CSV file. After this load data from CSV into Neo4j using Cypher (LOAD CSV command).

This job uses CSV file to store data from SQL DB before storing into Neo4j. This allow to use LOAD CSV command in Cypher. LOAD CSV has best performance results in importing large data from CSV. There is a possibility to take record from SQL result and insert into Neo4j directly using some writing clauses (for example: MERGE, CREATE, ...).

Used drivers:

  • Postgresql

  • Groovy

  • CSV

  • Neo4j

<!DOCTYPE etl SYSTEM "http://scriptella.javaforge.com/dtd/etl.dtd">
<etl>
<description>DB to Neo4j</description>
<properties>
db.driver=postgresql
db.url=jdbc:postgresql://localhost:5432/cmdb
db.user=someuser
db.password=somepassword
csv.file1=/usr/local/data/nodes.csv
csv.file2=/usr/local/data/edges.csv
</properties>
<!-- ######## CONNECTIONS ######## -->
<connection id="db" driver="$db.driver" url="$db.url" user="$db.user" password="$db.password" />
<connection id="csv_out_1" driver="csv" url="$csv.file1">
null_string=<!-- if this is attribute is not used then name of column is inserted into CSV if null is returned in result -->
separator=;
flush=true<!-- if this is false then flush is done when connector is closed which means at the end of ETL scipt but we need flushed CSV before Neo4j use it -->
</connection>
<connection id="csv_out_2" driver="csv" url="$csv.file2">
null_string=
separator=;
flush=true
</connection>
<connection id="neo4j" driver="neo4j" url="bolt://localhost:7687" user="neo4j" password="admin"/>
<connection id="groovy" driver="script">language=groovy</connection>
<connection id="logInfo" driver="log">
level=INFO
</connection>
<!-- ######## EXECUTION ######## -->
<!-- write CSV header -->
<script connection-id="csv_out_1">
name_normalized,id_sm,name,title,type,subtype
</script>
<query connection-id="db">
select
case when id_sm is not null then id_sm else '' end as id_sm,
logical_name,
title,
case when type is not null then type else '' end as type,
case when subtype is not null then subtype else '' end as subtype
from device2m1
<!-- trasform some results -->
<query connection-id="groovy">
<![CDATA[
import java.text.Normalizer
import java.text.Normalizer.Form
import java.lang.StringBuffer
if (logical_name != null && logical_name.length() > 0) {
name_normalized = Normalizer.normalize(logical_name, Form.NFD).replaceAll("[^\\p{ASCII}]","").replaceAll(" ", "");
} else {
name_normalized = logical_name
}
query.next()
]]>
<!-- write record into CSV -->
<script connection-id="csv_out_1">
$name_normalized,$id_sm,$logical_name,$title,$type,$subtype
</script>
</query>
</query>
<!-- write CSV header -->
<script connection-id="csv_out_2">
logical_name_normalized,relationship_name,related_cis_normalized,record_number,relationship_type,relationship_subtype
</script>
<query connection-id="db">
select
logical_name,
relationship_name,
related_cis,
record_number,
relationship_type,
relationship_subtype
from relations
<!-- trasform some results -->
<query connection-id="groovy">
<![CDATA[
import java.text.Normalizer
import java.text.Normalizer.Form
if (logical_name != null && logical_name.length() > 0) {
logical_name_normalized = Normalizer.normalize(logical_name, Form.NFD).replaceAll("[^\\p{ASCII}]","").replaceAll(" ", "");
} else {
logical_name_normalized = logical_name
}
if (related_cis != null && related_cis.length() > 0) {
related_cis_normalized = Normalizer.normalize(related_cis, Form.NFD).replaceAll("[^\\p{ASCII}]","").replaceAll(" ", "");
} else {
related_cis_normalized = related_cis
}
query.next()
]]>
<!-- write record into CSV -->
<script connection-id="csv_out_2">
$logical_name_normalized,$relationship_name,$related_cis_normalized,$record_number,$relationship_type,$relationship_subtype
</script>
</query>
</query>
<!-- load into Neo4j -->
<script connection-id="neo4j">
LOAD CSV WITH HEADERS FROM 'file:$csv.file1' AS line FIELDTERMINATOR ';'
MERGE (n:Ci{id:line.name_normalized})
SET
n.idSm=line.id_sm,
n.logicalName=line.name,
n.title=line.title,
n.type=line.type,
n.subtype=line.subtype,
n.updated=timestamp()
</script>
<script connection-id="neo4j">
LOAD CSV WITH HEADERS FROM 'file:$csv.file2' AS line FIELDTERMINATOR ';'
MATCH (n1:Ci{id:line.logical_name_normalized}),(n2:Ci{id:line.related_cis_normalized}) MERGE (n1)-[r:RELATED{id:line.relationship_name}]->(n2)
SET
r.recNumber=line.record_number,
r.relType=line.relationship_type,
r.relSubType=line.relationship_subtype,
r.updated=timestamp()
</script>
</etl>