Walkthrough
Setting up
To get started, you must obtain or write the .proto
file(s) that describe the interface you want to implement and add those files to your project. Add .proto
files to your project’s src/main/protobuf
src/main/proto
src/main/proto
directory. (See the detailed chapters on sbt, Gradle and Maven for information on taking .proto definitions from dependencies)
Then add the Akka gRPC plugin to your build:
- sbt
-
// in project/plugins.sbt: addSbtPlugin("com.lightbend.akka.grpc" % "sbt-akka-grpc" % "1.2.5") // // in build.sbt: enablePlugins(AkkaGrpcPlugin)
- Gradle
-
buildscript { repositories { mavenLocal() gradlePluginPortal() } dependencies { // see https://plugins.gradle.org/plugin/com.lightbend.akka.grpc.gradle // for the currently latest version. classpath 'gradle.plugin.com.lightbend.akka.grpc:akka-grpc-gradle-plugin:1.2.5' } } plugins { id 'java' id 'application' } apply plugin: 'com.lightbend.akka.grpc.gradle' repositories { mavenLocal() mavenCentral() }
- Maven
-
<project> <modelVersion>4.0.0</modelVersion> <name>Project name</name> <groupId>com.example</groupId> <artifactId>my-grpc-app</artifactId> <version>0.1-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <akka.grpc.version>1.2.5</akka.grpc.version> <grpc.version>1.47.0</grpc.version> <project.encoding>UTF-8</project.encoding> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka.grpc</groupId> <artifactId>akka-grpc-runtime_2.12</artifactId> <version>${akka.grpc.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>com.lightbend.akka.grpc</groupId> <artifactId>akka-grpc-maven-plugin</artifactId> <version>${akka.grpc.version}</version> <executions> <execution> <goals> <goal>generate</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
For a complete overview of the configuration options see the chapter for your build tool, sbt, Gradle or Maven.
Dependencies
The Akka gRPC plugin makes your code depend on the akka-grpc-runtime
library.
The table below shows direct dependencies of it and the second tab shows all libraries it depends on transitively. Be aware that the io.grpc.grpc-api
library depends on Guava.
- Direct dependencies
Organization Artifact Version com.google.protobuf protobuf-java 3.20.1 com.thesamet.scalapb scalapb-runtime_2.12 0.11.11 com.typesafe.akka akka-discovery_2.12 2.6.19 com.typesafe.akka akka-http-core_2.12 10.2.9 com.typesafe.akka akka-http_2.12 10.2.9 com.typesafe.akka akka-stream_2.12 2.6.19 io.grpc grpc-core 1.47.0 io.grpc grpc-netty-shaded 1.47.0 org.scala-lang.modules scala-collection-compat_2.12 2.7.0 org.scala-lang scala-library 2.12.16 - Dependency tree
com.google.protobuf protobuf-java 3.20.1 com.thesamet.scalapb scalapb-runtime_2.12 0.11.11 Apache 2 com.google.protobuf protobuf-java 3.20.1 com.thesamet.scalapb lenses_2.12 0.11.11 Apache 2 org.scala-lang.modules scala-collection-compat_2.12 2.7.0 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.12 2.7.0 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 com.typesafe.akka akka-discovery_2.12 2.6.19 Apache-2.0 com.typesafe.akka akka-actor_2.12 2.6.19 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.2.9 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.2.9 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 com.typesafe.akka akka-http_2.12 10.2.9 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.2.9 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.2.9 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 com.typesafe.akka akka-stream_2.12 2.6.19 Apache-2.0 com.typesafe.akka akka-actor_2.12 2.6.19 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.12 2.6.19 Apache-2.0 com.typesafe ssl-config-core_2.12 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.12.16 Apache-2.0 io.grpc grpc-core 1.47.0 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.9.0 Apache-2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava guava 31.0.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.5 GNU General Public License, version 2 (GPL2), with the classpath exception org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.47.0 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava guava 31.0.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.5 GNU General Public License, version 2 (GPL2), with the classpath exception org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.47.0 Apache 2.0 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.19 io.grpc grpc-netty-shaded 1.47.0 Apache 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava guava 31.0.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.5 GNU General Public License, version 2 (GPL2), with the classpath exception org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-core 1.47.0 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.9.0 Apache-2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava guava 31.0.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.5 GNU General Public License, version 2 (GPL2), with the classpath exception org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.47.0 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava guava 31.0.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.10.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.5 GNU General Public License, version 2 (GPL2), with the classpath exception org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.47.0 Apache 2.0 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.19 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.scala-lang.modules scala-collection-compat_2.12 2.7.0 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0 org.scala-lang scala-library 2.12.16 Apache-2.0
Writing a service definition
Define the interfaces you want to implement in your project’s src/main/protobuf
src/main/proto
src/main/proto
file(s).
For example, this is the definition of a Hello World service:
sourcesyntax = "proto3";
import "google/protobuf/timestamp.proto";
option java_multiple_files = true;
option java_package = "example.myapp.helloworld.grpc";
option java_outer_classname = "HelloWorldProto";
package helloworld;
////////////////////////////////////// The greeting service definition.
service GreeterService {
//////////////////////
// Sends a greeting //
////////*****/////////
// HELLO //
////////*****/////////
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Comment spanning
// on several lines
rpc ItKeepsTalking (stream HelloRequest) returns (HelloReply) {}
/*
* C style comments
*/
rpc ItKeepsReplying (HelloRequest) returns (stream HelloReply) {}
/* C style comments
* on several lines
* with non-empty heading/trailing line */
rpc StreamHellos (stream HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
google.protobuf.Timestamp timestamp = 2;
}
Generating interfaces and stubs
Start by generating code from the .proto
definition with:
- sbt
-
sbt compile
- Gradle
-
./gradlew build
- Maven
-
mvn akka-grpc:generate
From the above definition, Akka gRPC generates interfaces that look like this:
- Scala
-
source
// Generated by Akka gRPC. DO NOT EDIT. package example.myapp.helloworld.grpc import akka.annotation.ApiMayChange import akka.grpc.AkkaGrpcGenerated /** * #services * //////////////////////////////////// The greeting service definition. */ @AkkaGrpcGenerated trait GreeterService { /** * //////////////////// * Sends a greeting // * //////*****///////// * HELLO // * //////*****///////// */ def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply] /** * Comment spanning * on several lines */ def itKeepsTalking(in: akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed]): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply] /** * C style comments */ def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, akka.NotUsed] /** * C style comments * on several lines * with non-empty heading/trailing line */ def streamHellos(in: akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed]): akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, akka.NotUsed] } @AkkaGrpcGenerated object GreeterService extends akka.grpc.ServiceDescription { val name = "helloworld.GreeterService" val descriptor: com.google.protobuf.Descriptors.FileDescriptor = example.myapp.helloworld.grpc.HelloworldProto.javaDescriptor; object Serializers { import akka.grpc.scaladsl.ScalapbProtobufSerializer val HelloRequestSerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion) val HelloReplySerializer = new ScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion) } @ApiMayChange @AkkaGrpcGenerated object MethodDescriptors { import akka.grpc.internal.Marshaller import io.grpc.MethodDescriptor import Serializers._ val sayHelloDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.UNARY ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "SayHello")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() val itKeepsTalkingDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.CLIENT_STREAMING ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "ItKeepsTalking")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() val itKeepsReplyingDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.SERVER_STREAMING ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "ItKeepsReplying")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() val streamHellosDescriptor: MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply] = MethodDescriptor.newBuilder() .setType( MethodDescriptor.MethodType.BIDI_STREAMING ) .setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService", "StreamHellos")) .setRequestMarshaller(new Marshaller(HelloRequestSerializer)) .setResponseMarshaller(new Marshaller(HelloReplySerializer)) .setSampledToLocalTracing(true) .build() } } - Java
-
source
// Generated by Akka gRPC. DO NOT EDIT. package example.myapp.helloworld.grpc; import akka.grpc.ProtobufSerializer; import akka.grpc.javadsl.GoogleProtobufSerializer; import akka.grpc.AkkaGrpcGenerated; /** * //////////////////////////////////// The greeting service definition. */ public interface GreeterService { /** * //////////////////// * Sends a greeting // * //////*****///////// * HELLO // * //////*****///////// */ java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> sayHello(example.myapp.helloworld.grpc.HelloRequest in); /** * Comment spanning * on several lines */ java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> itKeepsTalking(akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed> in); /** * C style comments */ akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, akka.NotUsed> itKeepsReplying(example.myapp.helloworld.grpc.HelloRequest in); /** * C style comments * on several lines * with non-empty heading/trailing line */ akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, akka.NotUsed> streamHellos(akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed> in); static String name = "helloworld.GreeterService"; static akka.grpc.ServiceDescription description = new akka.grpc.internal.ServiceDescriptionImpl(name, HelloWorldProto.getDescriptor()); @AkkaGrpcGenerated public static class Serializers { public static ProtobufSerializer<example.myapp.helloworld.grpc.HelloRequest> HelloRequestSerializer = new GoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloRequest.parser()); public static ProtobufSerializer<example.myapp.helloworld.grpc.HelloReply> HelloReplySerializer = new GoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloReply.parser()); } }
and model case classes for HelloRequest
and HelloResponse
.
The service interface is the same for the client and the server side. On the server side, the service implements the interface, on the client side the Akka gRPC infrastructure implements a stub that will connect to the remote service when called.
There are 4 different types of calls:
- unary call - single request that returns a
Future
CompletionStage
with a single response, seesayHello
in above example - client streaming call -
Source
(stream) of requests from the client that returns aFuture
CompletionStage
with a single response, seeitKeepsTalking
in above example - server streaming call - single request that returns a
Source
(stream) of responses, seeitKeepsReplying
in above example - client and server streaming call -
Source
(stream) of requests from the client that returns aSource
(stream) of responses, seestreamHellos
in above example
Implementing the service
Let’s implement these 4 calls in a new class:
- Scala
-
source
package example.myapp.helloworld import scala.concurrent.Future import akka.NotUsed import akka.stream.Materializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import com.google.protobuf.timestamp.Timestamp import example.myapp.helloworld.grpc._ class GreeterServiceImpl(implicit mat: Materializer) extends GreeterService { import mat.executionContext override def sayHello(in: HelloRequest): Future[HelloReply] = { println(s"sayHello to ${in.name}") Future.successful(HelloReply(s"Hello, ${in.name}", Some(Timestamp.apply(123456, 123)))) } override def itKeepsTalking(in: Source[HelloRequest, NotUsed]): Future[HelloReply] = { println(s"sayHello to in stream...") in.runWith(Sink.seq).map(elements => HelloReply(s"Hello, ${elements.map(_.name).mkString(", ")}")) } override def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = { println(s"sayHello to ${in.name} with stream of chars...") Source(s"Hello, ${in.name}".toList).map(character => HelloReply(character.toString)) } override def streamHellos(in: Source[HelloRequest, NotUsed]): Source[HelloReply, NotUsed] = { println(s"sayHello to stream...") in.map(request => HelloReply(s"Hello, ${request.name}")) } }
- Java
-
source
package example.myapp.helloworld; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import akka.NotUsed; import akka.stream.Materializer; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import com.google.protobuf.Timestamp; import example.myapp.helloworld.grpc.*; public class GreeterServiceImpl implements GreeterService { private final Materializer mat; public GreeterServiceImpl(Materializer mat) { this.mat = mat; } @Override public CompletionStage<HelloReply> sayHello(HelloRequest in) { System.out.println("sayHello to " + in.getName()); HelloReply reply = HelloReply.newBuilder() .setMessage("Hello, " + in.getName()) .setTimestamp(Timestamp.newBuilder().setSeconds(1234567890).setNanos(12345).build()) .build(); return CompletableFuture.completedFuture(reply); } @Override public CompletionStage<HelloReply> itKeepsTalking(Source<HelloRequest, NotUsed> in) { System.out.println("sayHello to in stream..."); return in.runWith(Sink.seq(), mat) .thenApply(elements -> { String elementsStr = elements.stream().map(elem -> elem.getName()) .collect(Collectors.toList()).toString(); return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build(); }); } @Override public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in) { System.out.println("sayHello to " + in.getName() + " with stream of chars"); List<Character> characters = ("Hello, " + in.getName()) .chars().mapToObj(c -> (char) c).collect(Collectors.toList()); return Source.from(characters) .map(character -> { return HelloReply.newBuilder().setMessage(String.valueOf(character)).build(); }); } @Override public Source<HelloReply, NotUsed> streamHellos(Source<HelloRequest, NotUsed> in) { System.out.println("sayHello to stream..."); return in.map(request -> HelloReply.newBuilder().setMessage("Hello, " + request.getName()).build()); } }
Serving the service with Akka HTTP
Note, how the implementation we just wrote is free from any gRPC related boilerplate. It only uses the generated model and interfaces from your domain and basic Akka streams classes. We now need to connect this implementation class to the web server to offer it to clients.
Akka gRPC servers are implemented with Akka HTTP. In addition to the above GreeterService
, a GreeterServiceHandler
GreeterServiceHandlerFactory
was generated that wraps the implementation with the gRPC functionality to be plugged into an existing Akka HTTP server app.
You create the request handler by calling GreeterServiceHandler(yourImpl)
GreeterServiceHandlerFactory.create(yourImpl, ...)
.
The server will reuse the given instance of the implementation, which means that it is shared between (potentially concurrent) requests. Make sure that the implementation is thread-safe. In the sample above there is no mutable state, so it is safe. For more information about safely implementing servers with state see the advice about stateful below.
A complete main program that starts an Akka HTTP server with the GreeterService
looks like this:
- Scala
-
source
package example.myapp.helloworld import akka.actor.ActorSystem import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } import akka.http.scaladsl.Http import com.typesafe.config.ConfigFactory import example.myapp.helloworld.grpc._ import scala.concurrent.{ ExecutionContext, Future } object GreeterServer { def main(args: Array[String]): Unit = { // Important: enable HTTP/2 in ActorSystem's config // We do it here programmatically, but you can also set it in the application.conf val conf = ConfigFactory .parseString("akka.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()) val system = ActorSystem("HelloWorld", conf) new GreeterServer(system).run() // ActorSystem threads will keep the app alive until `system.terminate()` is called } } class GreeterServer(system: ActorSystem) { def run(): Future[Http.ServerBinding] = { // Akka boot up code implicit val sys: ActorSystem = system implicit val ec: ExecutionContext = sys.dispatcher // Create service handlers val service: HttpRequest => Future[HttpResponse] = GreeterServiceHandler(new GreeterServiceImpl()) // Bind service handler servers to localhost:8080/8081 val binding = Http().newServerAt("127.0.0.1", 8080).bind(service) // report successful binding binding.foreach { binding => println(s"gRPC server bound to: ${binding.localAddress}") } binding } }
- Java
-
source
package example.myapp.helloworld; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.stream.SystemMaterializer; import akka.stream.Materializer; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import example.myapp.helloworld.grpc.GreeterService; import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory; import java.util.concurrent.CompletionStage; class GreeterServer { public static void main(String[] args) throws Exception { // important to enable HTTP/2 in ActorSystem's config Config conf = ConfigFactory.parseString("akka.http.server.preview.enable-http2 = on") .withFallback(ConfigFactory.defaultApplication()); // Akka ActorSystem Boot ActorSystem sys = ActorSystem.create("HelloWorld", conf); run(sys).thenAccept(binding -> { System.out.println("gRPC server bound to: " + binding.localAddress()); }); // ActorSystem threads will keep the app alive until `system.terminate()` is called } public static CompletionStage<ServerBinding> run(ActorSystem sys) throws Exception { Materializer mat = SystemMaterializer.get(sys).materializer(); // Instantiate implementation GreeterService impl = new GreeterServiceImpl(mat); return Http .get(sys) .newServerAt("127.0.0.1", 8090) .bind(GreeterServiceHandlerFactory.create(impl, sys)); } }
It’s important to enable HTTP/2 in Akka HTTP in the configuration of the ActorSystem
by setting
akka.http.server.preview.enable-http2 = on
In the example this was done from the main
method, but you could also do this from within your application.conf
.
The above example does not use TLS. Find more about how to Serve gRPC over TLS on the deployment section.
Serving multiple services
When a server handles several services the handlers must be combined with akka.grpc.scaladsl.ServiceHandler.concatOrNotFound
akka.grpc.javadsl.ServiceHandler.concatOrNotFound
:
- Scala
-
source
import akka.grpc.scaladsl.ServiceHandler // explicit types not needed but included in example for clarity val greeterService: PartialFunction[HttpRequest, Future[HttpResponse]] = example.myapp.helloworld.grpc.GreeterServiceHandler.partial(new GreeterServiceImpl()) val echoService: PartialFunction[HttpRequest, Future[HttpResponse]] = EchoServiceHandler.partial(new EchoServiceImpl) val reflectionService = ServerReflection.partial(List(GreeterService, EchoService)) val serviceHandlers: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound(greeterService, echoService, reflectionService) Http() .newServerAt("127.0.0.1", 8080) .bind(serviceHandlers)
- Java
-
source
import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; Function<HttpRequest, CompletionStage<HttpResponse>> greeterService = GreeterServiceHandlerFactory.create(new GreeterServiceImpl(mat), sys); Function<HttpRequest, CompletionStage<HttpResponse>> echoService = EchoServiceHandlerFactory.create(new EchoServiceImpl(), sys); @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> serviceHandlers = ServiceHandler.concatOrNotFound(greeterService, echoService); Http.get(sys) .newServerAt("127.0.0.1", 8090) .bind(serviceHandlers)
Note that GreeterServiceHandler.partial
and EchoServiceHandler.partial
are used instead of apply
methods to create partial functions that are combined by concatOrNotFound
.
Running the server
To run the server with HTTP/2 using HTTPS on a JVM prior to version 1.8.0_251, you will likely have to configure the Jetty ALPN agent as described in the Akka HTTP documentation. Later JVM versions have this support built-in.
See the detailed chapters on sbt, Gradle and Maven for details on adding the agent.
Stateful services
More often than not, the whole point of the implementing a service is to keep state. Since the service implementation is shared between concurrent incoming requests any state must be thread safe.
There are two recommended ways to deal with this:
- Put the mutable state inside an actor and interact with it through
ask
from unary methods orFlow.ask
from streams. - Keep the state in a thread-safe place. For example, a CRUD application that is backed by a database is thread-safe when access to the backing database is (which until recently was THE way that applications dealt with request concurrency).
This is an example based on the Hello World above, but allowing users to change the greeting through a unary call:
- Scala
-
source
class GreeterServiceImpl(system: ActorSystem) extends GreeterService { val greeterActor = system.actorOf(GreeterActor.props("Hello"), "greeter") def sayHello(in: HelloRequest): Future[HelloReply] = { // timeout and execution context for ask implicit val timeout: Timeout = 3.seconds import system.dispatcher (greeterActor ? GreeterActor.GetGreeting) .mapTo[GreeterActor.Greeting] .map(message => HelloReply(s"${message.greeting}, ${in.name}")) } def changeGreeting(in: ChangeRequest): Future[ChangeResponse] = { greeterActor ! GreeterActor.ChangeGreeting(in.newGreeting) Future.successful(ChangeResponse()) } }
- Java
-
source
public final class GreeterServiceImpl implements GreeterService { private final ActorSystem system; private final ActorRef greeterActor; public GreeterServiceImpl(ActorSystem system) { this.system = system; this.greeterActor = system.actorOf(GreeterActor.props("Hello"), "greeter"); } public CompletionStage<HelloReply> sayHello(HelloRequest in) { return ask(greeterActor, GreeterActor.GET_GREETING, Duration.ofSeconds(5)) .thenApply(message -> HelloReply.newBuilder() .setMessage(((GreeterActor.Greeting) message).greeting) .build() ); } public CompletionStage<ChangeResponse> changeGreeting(ChangeRequest in) { greeterActor.tell(new GreeterActor.ChangeGreeting(in.getNewGreeting()), ActorRef.noSender()); return CompletableFuture.completedFuture(ChangeResponse.newBuilder().build()); } }
The GreeterActor
is implemented like this:
- Scala
-
source
object GreeterActor { case class ChangeGreeting(newGreeting: String) case object GetGreeting case class Greeting(greeting: String) def props(initialGreeting: String) = Props(new GreeterActor(initialGreeting)) } class GreeterActor(initialGreeting: String) extends Actor { import GreeterActor._ var greeting = Greeting(initialGreeting) def receive = { case GetGreeting => sender() ! greeting case ChangeGreeting(newGreeting) => greeting = Greeting(newGreeting) } }
- Java
-
source
public class GreeterActor extends AbstractActor { public static class ChangeGreeting { public final String newGreeting; public ChangeGreeting(String newGreeting) { this.newGreeting = newGreeting; } } public static class GetGreeting {} public static GetGreeting GET_GREETING = new GetGreeting(); public static class Greeting { public final String greeting; public Greeting(String greeting) { this.greeting = greeting; } } public static Props props(final String initialGreeting) { return Props.create(GreeterActor.class, () -> new GreeterActor(initialGreeting)); } private Greeting greeting; public GreeterActor(String initialGreeting) { greeting = new Greeting(initialGreeting); } public AbstractActor.Receive createReceive() { return receiveBuilder() .match(GetGreeting.class, this::onGetGreeting) .match(ChangeGreeting.class, this::onChangeGreeting) .build(); } private void onGetGreeting(GetGreeting get) { getSender().tell(greeting, getSelf()); } private void onChangeGreeting(ChangeGreeting change) { greeting = new Greeting(change.newGreeting); } }
Now the actor mailbox is used to synchronize accesses to the mutable state.