OrientDB

OrientDB

OrientDB is a multi-model database, supporting graph, document, key/value, and object models, but the relationships are managed as in graph databases with direct connections between records. It supports schema-less, schema-full and schema-mixed modes. It has a strong security profiling system based on users and roles and supports querying with Gremlin along with SQL extended for graph traversal.

For more information about OrientDB please visit the official documentation, more details are available in the OrientDB manual.

The Alpakka OrientDB connector provides Akka Stream sources and sinks for OrientDB.

Project Info: Alpakka OrientDB
Artifact
com.lightbend.akka
akka-stream-alpakka-orientdb
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.orientdb
License
Readiness level
Community-driven
Since 0.17, 2018-02-19
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-orientdb" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-orientdb_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-orientdb_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.orientechnologiesorientdb-graphdb3.0.4Apache 2
com.orientechnologiesorientdb-object3.0.4Apache 2
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.orientechnologies    orientdb-graphdb    3.0.4    Apache 2
    com.orientechnologies    orientdb-core    3.0.4    Apache 2
        com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
        net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
            net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
        net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
    com.orientechnologies    orientdb-server    3.0.4    Apache 2
        com.orientechnologies    orientdb-client    3.0.4    Apache 2
            com.orientechnologies    orientdb-core    3.0.4    Apache 2
                com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
                    net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
                net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
        com.orientechnologies    orientdb-tools    3.0.4    Apache 2
            com.orientechnologies    orientdb-client    3.0.4    Apache 2
                com.orientechnologies    orientdb-core    3.0.4    Apache 2
                    com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                    net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
                        net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
                    net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
            com.orientechnologies    orientdb-core    3.0.4    Apache 2
                com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
                    net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
                net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
        com.sun.xml.bind    jaxb-core    2.3.0.1    CDDL+GPL License
        com.sun.xml.bind    jaxb-impl    2.3.0.1    CDDL+GPL License
        javax.mail    mail    1.4.7    CDDL
            javax.activation    activation    1.1    Common Development and Distribution License (CDDL) v1.0
        javax.xml.bind    jaxb-api    2.3.0    CDDL 1.1
    com.orientechnologies    orientdb-tools    3.0.4    Apache 2
        com.orientechnologies    orientdb-client    3.0.4    Apache 2
            com.orientechnologies    orientdb-core    3.0.4    Apache 2
                com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
                net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
                    net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
                net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
        com.orientechnologies    orientdb-core    3.0.4    Apache 2
            com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
            net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
                net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
            net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
    com.tinkerpop.blueprints    blueprints-core    2.6.0    BSD 3-Clause
        com.carrotsearch    hppc    0.6.0    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.2.3    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.2.3    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.2.3    The Apache Software License, Version 2.0
        commons-configuration    commons-configuration    1.6    The Apache Software License, Version 2.0
            commons-beanutils    commons-beanutils-core    1.8.0    The Apache Software License, Version 2.0
            commons-digester    commons-digester    1.8    The Apache Software License, Version 2.0
                commons-beanutils    commons-beanutils    1.7.0
            commons-lang    commons-lang    2.4    The Apache Software License, Version 2.0
        commons-logging    commons-logging    1.1.1    The Apache Software License, Version 2.0
        org.codehaus.jettison    jettison    1.3.3    Apache License, Version 2.0
            stax    stax-api    1.0.1    The Apache Software License, Version 2.0
com.orientechnologies    orientdb-object    3.0.4    Apache 2
    com.orientechnologies    orientdb-core    3.0.4    Apache 2
        com.googlecode.concurrentlinkedhashmap    concurrentlinkedhashmap-lru    1.4.2    The Apache Software License, Version 2.0
        net.java.dev.jna    jna-platform    4.5.0    LGPL, version 2.1
            net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
        net.java.dev.jna    jna    4.5.0    LGPL, version 2.1
    org.hibernate.javax.persistence    hibernate-jpa-2.0-api    1.0.1.Final    Unknown License
    org.javassist    javassist    3.22.0-GA    MPL 1.1
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Usage

Sources, Flows and Sinks provided by this connector need dbUrl & credentials to access to OrientDB.

Scala
val url = "remote:127.0.0.1:2424/"
val dbName = "GratefulDeadConcertsScala"
val dbUrl = s"$url$dbName"
val username = "root"
val password = "root"
Java
private static String url = "remote:127.0.0.1:2424/";
private static String dbName = "GratefulDeadConcertsJava";
private static String dbUrl = url + dbName;
private static String username = "root";
private static String password = "root";

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
Java
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);

This is all preparation that we are going to need.

ODocument message

Now we can stream messages which contain OrientDB’s ODocument (in Scala or Java) from or to OrientDB by providing the ODatabaseDocumentTx to the OrientDBSource or the OrientDBSink.

Scala
val f2 = OrientDBSource(
  sink4,
  OrientDBSourceSettings(oDatabasePool = oDatabase)
).map { message =>
    message.oDocument.field[String]("book_title")
  }
  .runWith(Sink.seq)
Java
CompletionStage<Done> f1 =
    OrientDBSource.create(source, OrientDBSourceSettings.create(oDatabase), null)
        .map(m -> OIncomingMessage.create(m.oDocument()))
        .runWith(
            OrientDBSink.create(sink1, OrientDBUpdateSettings.create(oDatabase)), materializer);

Typed messages

Also, it’s possible to stream messages which contains any classes.

Java
public class source1 {

  private String book_title;

  public void setBook_title(String book_title) {
    this.book_title = book_title;
  }

  public String getBook_title() {
    return book_title;
  }
}

public class sink2 {

  private String book_title;

  public void setBook_title(String book_title) {
    this.book_title = book_title;
  }

  public String getBook_title() {
    return book_title;
  }
}

Use OrientDBSource.typed and OrientDBSink.typed to create source and sink instead.

Java
CompletionStage<Done> f1 =
    OrientDBSource.typed(source, OrientDBSourceSettings.create(oDatabase), source1.class, null)
        .map(
            m -> {
              ODatabaseDocumentTx db = oDatabase.acquire();
              db.setDatabaseOwner(new OObjectDatabaseTx(db));
              ODatabaseRecordThreadLocal.instance().set(db);
              sink2 sink = new sink2();
              sink.setBook_title(m.oDocument().getBook_title());
              return OIncomingMessage.create(sink);
            })
        .runWith(
            OrientDBSink.typed(sink2, OrientDBUpdateSettings.create(oDatabase), sink2.class),
            materializer);

Configuration

We can configure the source by OrientDBSourceSettings.

Scala (source)
final case class OrientDBSourceSettings(oDatabasePool: OPartitionedDatabasePool,
                                        maxPartitionSize: Int = Runtime.getRuntime.availableProcessors(),
                                        maxPoolSize: Int = -1,
                                        skip: Int = 0,
                                        limit: Int = 10)
Parameter Default Description
maxPartitionSize OrientDBSource and OrientDBSink uses this for initializing DB Connections.
maxPoolSize -1 OrientDBSource and OrientDBSink uses this for initializing DB Connections.
skip OrientDBSource uses this property to fetch data from the DB.
limit OrientDBSource uses this property to fetch data from the DB.
dbUrl url to the OrientDB database.
username username to connect to OrientDB.
password password to connect to OrientDB.

Also, we can configure the sink by OrientDBUpdateSettings.

Scala (sink)
final case class OrientDBUpdateSettings(oDatabasePool: OPartitionedDatabasePool,
                                        maxPartitionSize: Int = Runtime.getRuntime.availableProcessors(),
                                        maxPoolSize: Int = -1,
                                        maxRetry: Int = 1,
                                        retryInterval: FiniteDuration = 5000 millis,
                                        bufferSize: Int = 10)
Parameter Default Description
maxPartitionSize OrientDBSource and OrientDBSink uses this for initializing DB Connections.
maxPoolSize -1 OrientDBSource and OrientDBSink uses this for initializing DB Connections.
maxRetry 1 OrientDBSink uses this for retrying write operations to OrientDB.
retryInterval 5000 OrientDBSink uses this for retrying write operations to OrientDB.
bufferSize OrientDBSink uses this for retrieving data from DB.
dbUrl url to the OrientDB database.
username username to connect to OrientDB.
password password to connect to OrientDB.

Using OrientDB as a Flow

You can also build flow stages. The API is similar to creating Sinks.

Scala (flow)

val f1 = OrientDBSource( source, OrientDBSourceSettings(oDatabasePool = oDatabase) ).map { message: OOutgoingMessage[ODocument] => OIncomingMessage(message.oDocument) } .via( OrientDBFlow.create( sink5, OrientDBUpdateSettings(oDatabasePool = oDatabase) ) ) .runWith(Sink.seq)
Java (flow)
CompletionStage<List<List<OIncomingMessage<ODocument, NotUsed>>>> f1 =
    OrientDBSource.create(source, OrientDBSourceSettings.create(oDatabase), null)
        .map(m -> OIncomingMessage.create(m.oDocument()))
        .via(OrientDBFlow.create(sink3, OrientDBUpdateSettings.create(oDatabase)))
        .runWith(Sink.seq(), materializer);

Passing data through OrientDBFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to OrientDB.

Scala
// We're going to pretend we got messages from kafka.
// After we've written them to oRIENTdb, we want
// to commit the offset to Kafka

case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)

val messagesFromKafka = List(
  KafkaMessage(Book("Book 1"), KafkaOffset(0)),
  KafkaMessage(Book("Book 2"), KafkaOffset(1)),
  KafkaMessage(Book("Book 3"), KafkaOffset(2))
)

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val f1 = Source(messagesFromKafka)
  .map { kafkaMessage: KafkaMessage =>
    val book = kafkaMessage.book
    val id = book.title
    println("title: " + book.title)

    OIncomingMessage(new ODocument().field("book_title", id), kafkaMessage.offset)
  }
  .via(
    OrientDBFlow.createWithPassThrough(
      sink7,
      OrientDBUpdateSettings(oDatabase)
    )
  )
  .map { messages: Seq[OIncomingMessage[ODocument, KafkaOffset]] =>
    messages.foreach { message =>
      commitToKafka(message.passThrough)
    }
  }
  .runWith(Sink.seq)

Await.ready(f1, Duration.Inf)
Java
// We're going to pretend we got messages from kafka.
// After we've written them to OrientDB, we want
// to commit the offset to Kafka

List<Integer> committedOffsets = new ArrayList<>();
List<messagesFromKafka> messagesFromKafkas =
    Arrays.asList(
        new messagesFromKafka("Akka Concurrency", new KafkaOffset(0)),
        new messagesFromKafka("Akka in Action", new KafkaOffset(1)),
        new messagesFromKafka("Effective Akka", new KafkaOffset(2)));

Consumer<KafkaOffset> commitToKafka =
    new Consumer<KafkaOffset>() {
      @Override
      public void accept(KafkaOffset kafkaOffset) {
        committedOffsets.add(kafkaOffset.getOffset());
      }
    };

Source.from(messagesFromKafkas)
    .map(
        kafkaMessage -> {
          String book_title = kafkaMessage.getBook_title();
          return OIncomingMessage.create(
              new ODocument().field("book_title", book_title), kafkaMessage.kafkaOffset);
        })
    .via(OrientDBFlow.createWithPassThrough(sink6, OrientDBUpdateSettings.create(oDatabase)))
    .map(
        messages -> {
          ODatabaseDocumentTx db = oDatabase.acquire();
          db.setDatabaseOwner(new OObjectDatabaseTx(db));
          ODatabaseRecordThreadLocal.instance().set(db);
          messages
              .stream()
              .forEach(
                  message -> {
                    commitToKafka.accept(((KafkaOffset) message.passThrough()));
                  });
          return NotUsed.getInstance();
        })
    .runWith(Sink.seq(), materializer)
    .toCompletableFuture()
    .get(60, TimeUnit.SECONDS);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

> Test code requires OrientDB server running in the background. You can start one quickly using docker: > > docker run --rm -p 2424:2424 orientdb:latest

Scala
sbt
> orientdb/testOnly *.OrientDBSpec
Java
sbt
> orientdb/testOnly *.OrientDBTest
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.