This documentation regards version 2.4.4, however the current version is 2.5.1.
Walkthrough
Setting up
To get started, you must obtain or write the .proto file(s) that describe the interface you want to implement and add those files to your project. Add .proto files to your project’s src/main/protobufsrc/main/protosrc/main/proto directory. (See the detailed chapters on sbt, Gradle and Maven for information on taking .proto definitions from dependencies)
For a complete overview of the configuration options see the chapter for your build tool, sbt, Gradle or Maven.
Dependencies
The Akka gRPC plugin makes your code depend on the akka-grpc-runtime library.
The table below shows direct dependencies of it and the second tab shows all libraries it depends on transitively. Be aware that the io.grpc.grpc-api library depends on Guava.
source // Generated by Akka gRPC. DO NOT EDIT.package example.myapp.helloworld.grpc
import akka.annotation.ApiMayChangeimport akka.grpc.AkkaGrpcGenerated/**
* #services
* //////////////////////////////////// The greeting service definition.
*/@AkkaGrpcGenerated
trait GreeterService{/**
* ////////////////////
* Sends a greeting //
* //////*****/////////
* HELLO //
* //////*****/////////
*/def sayHello(in: example.myapp.helloworld.grpc.HelloRequest): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply]/**
* Comment spanning
* on several lines
*/def itKeepsTalking(in: akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed]): scala.concurrent.Future[example.myapp.helloworld.grpc.HelloReply]/**
* C style comments
*/def itKeepsReplying(in: example.myapp.helloworld.grpc.HelloRequest): akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, akka.NotUsed]/**
* C style comments
* on several lines
* with non-empty heading/trailing line */def streamHellos(in: akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed]): akka.stream.scaladsl.Source[example.myapp.helloworld.grpc.HelloReply, akka.NotUsed]}@AkkaGrpcGeneratedobjectGreeterServiceextends akka.grpc.ServiceDescription{
val name ="helloworld.GreeterService"
val descriptor: com.google.protobuf.Descriptors.FileDescriptor=
example.myapp.helloworld.grpc.HelloworldProto.javaDescriptor;objectSerializers{import akka.grpc.scaladsl.ScalapbProtobufSerializer
val HelloRequestSerializer=newScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloRequest.messageCompanion)
val HelloReplySerializer=newScalapbProtobufSerializer(example.myapp.helloworld.grpc.HelloReply.messageCompanion)}@ApiMayChange@AkkaGrpcGeneratedobjectMethodDescriptors{import akka.grpc.internal.Marshallerimport io.grpc.MethodDescriptorimportSerializers._
val sayHelloDescriptor:MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply]=MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.UNARY
).setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService","SayHello")).setRequestMarshaller(newMarshaller(HelloRequestSerializer)).setResponseMarshaller(newMarshaller(HelloReplySerializer)).setSampledToLocalTracing(true).build()
val itKeepsTalkingDescriptor:MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply]=MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.CLIENT_STREAMING
).setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService","ItKeepsTalking")).setRequestMarshaller(newMarshaller(HelloRequestSerializer)).setResponseMarshaller(newMarshaller(HelloReplySerializer)).setSampledToLocalTracing(true).build()
val itKeepsReplyingDescriptor:MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply]=MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.SERVER_STREAMING
).setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService","ItKeepsReplying")).setRequestMarshaller(newMarshaller(HelloRequestSerializer)).setResponseMarshaller(newMarshaller(HelloReplySerializer)).setSampledToLocalTracing(true).build()
val streamHellosDescriptor:MethodDescriptor[example.myapp.helloworld.grpc.HelloRequest, example.myapp.helloworld.grpc.HelloReply]=MethodDescriptor.newBuilder().setType(MethodDescriptor.MethodType.BIDI_STREAMING
).setFullMethodName(MethodDescriptor.generateFullMethodName("helloworld.GreeterService","StreamHellos")).setRequestMarshaller(newMarshaller(HelloRequestSerializer)).setResponseMarshaller(newMarshaller(HelloReplySerializer)).setSampledToLocalTracing(true).build()}}
source // Generated by Akka gRPC. DO NOT EDIT.package example.myapp.helloworld.grpc;import akka.grpc.ProtobufSerializer;import akka.grpc.javadsl.GoogleProtobufSerializer;import akka.grpc.AkkaGrpcGenerated;/**
* //////////////////////////////////// The greeting service definition.
*/publicinterfaceGreeterService{/**
* ////////////////////
* Sends a greeting //
* //////*****/////////
* HELLO //
* //////*****/////////
*/
java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> sayHello(example.myapp.helloworld.grpc.HelloRequestin);
java.util.concurrent.CompletionStage<com.google.api.HttpBody> sayHelloHttp(example.myapp.helloworld.grpc.HelloRequestin);/**
* Comment spanning
* on several lines
*/
java.util.concurrent.CompletionStage<example.myapp.helloworld.grpc.HelloReply> itKeepsTalking(akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed>in);/**
* C style comments
*/
akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, akka.NotUsed> itKeepsReplying(example.myapp.helloworld.grpc.HelloRequestin);/**
* C style comments
* on several lines
* with non-empty heading/trailing line */
akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloReply, akka.NotUsed> streamHellos(akka.stream.javadsl.Source<example.myapp.helloworld.grpc.HelloRequest, akka.NotUsed>in);staticString name ="helloworld.GreeterService";static akka.grpc.ServiceDescription description =new akka.grpc.internal.ServiceDescriptionImpl(name,HelloWorldProto.getDescriptor());@AkkaGrpcGeneratedpublicstaticclassSerializers{publicstaticProtobufSerializer<example.myapp.helloworld.grpc.HelloRequest>HelloRequestSerializer=newGoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloRequest.parser());publicstaticProtobufSerializer<example.myapp.helloworld.grpc.HelloReply>HelloReplySerializer=newGoogleProtobufSerializer<>(example.myapp.helloworld.grpc.HelloReply.parser());publicstaticProtobufSerializer<com.google.api.HttpBody> google_api_HttpBodySerializer =newGoogleProtobufSerializer<>(com.google.api.HttpBody.parser());}}
and model case classes for HelloRequest and HelloResponse.
The service interface is the same for the client and the server side. On the server side, the service implements the interface, on the client side the Akka gRPC infrastructure implements a stub that will connect to the remote service when called.
There are 4 different types of calls:
unary call - single request that returns a FutureCompletionStage with a single response, see sayHello in above example
client streaming call - Source (stream) of requests from the client that returns a FutureCompletionStage with a single response, see itKeepsTalking in above example
server streaming call - single request that returns a Source (stream) of responses, see itKeepsReplying in above example
client and server streaming call - Source (stream) of requests from the client that returns a Source (stream) of responses, see streamHellos in above example
sourcepackage example.myapp.helloworld;import java.util.List;import java.util.concurrent.CompletableFuture;import java.util.concurrent.CompletionStage;import java.util.stream.Collectors;import akka.NotUsed;import akka.stream.Materializer;import akka.stream.javadsl.Sink;import akka.stream.javadsl.Source;import com.google.api.HttpBody;import com.google.protobuf.BytesValue;import com.google.protobuf.Timestamp;import example.myapp.helloworld.grpc.*;publicclassGreeterServiceImplimplementsGreeterService{privatefinalMaterializer mat;publicGreeterServiceImpl(Materializer mat){this.mat = mat;}@OverridepublicCompletionStage<HelloReply> sayHello(HelloRequestin){System.out.println("sayHello to "+in.getName());HelloReply reply =HelloReply.newBuilder().setMessage("Hello, "+in.getName()).setTimestamp(Timestamp.newBuilder().setSeconds(1234567890).setNanos(12345).build()).build();returnCompletableFuture.completedFuture(reply);}@OverridepublicCompletionStage<HttpBody> sayHelloHttp(HelloRequestin){System.out.println("sayHelloHttp to "+in.getName());HttpBody reply =HttpBody.newBuilder().setData(
com.google.protobuf.ByteString.copyFrom("test".getBytes())).build();returnCompletableFuture.completedFuture(reply);}@OverridepublicCompletionStage<HelloReply> itKeepsTalking(Source<HelloRequest,NotUsed>in){System.out.println("sayHello to in stream...");returnin.runWith(Sink.seq(), mat).thenApply(elements ->{String elementsStr = elements.stream().map(elem -> elem.getName()).collect(Collectors.toList()).toString();returnHelloReply.newBuilder().setMessage("Hello, "+ elementsStr).build();});}@OverridepublicSource<HelloReply,NotUsed> itKeepsReplying(HelloRequestin){System.out.println("sayHello to "+in.getName()+" with stream of chars");List<Character> characters =("Hello, "+in.getName()).chars().mapToObj(c ->(char) c).collect(Collectors.toList());returnSource.from(characters).map(character ->{returnHelloReply.newBuilder().setMessage(String.valueOf(character)).build();});}@OverridepublicSource<HelloReply,NotUsed> streamHellos(Source<HelloRequest,NotUsed>in){System.out.println("sayHello to stream...");returnin.map(request ->HelloReply.newBuilder().setMessage("Hello, "+ request.getName()).build());}}
Serving the service with Akka HTTP
Note, how the implementation we just wrote is free from any gRPC related boilerplate. It only uses the generated model and interfaces from your domain and basic Akka streams classes. We now need to connect this implementation class to the web server to offer it to clients.
Akka gRPC servers are implemented with Akka HTTP. In addition to the above GreeterService, a GreeterServiceHandlerGreeterServiceHandlerFactory was generated that wraps the implementation with the gRPC functionality to be plugged into an existing Akka HTTP server app.
You create the request handler by calling GreeterServiceHandler(yourImpl)GreeterServiceHandlerFactory.create(yourImpl, ...).
Note
The server will reuse the given instance of the implementation, which means that it is shared between (potentially concurrent) requests. Make sure that the implementation is thread-safe. In the sample above there is no mutable state, so it is safe. For more information about safely implementing servers with state see the advice about stateful below.
A complete main program that starts an Akka HTTP server with the GreeterService looks like this:
sourcepackage example.myapp.helloworld
import akka.actor.ActorSystemimport akka.http.scaladsl.model.{HttpRequest,HttpResponse}import akka.http.scaladsl.Httpimport com.typesafe.config.ConfigFactoryimport example.myapp.helloworld.grpc._
import scala.concurrent.{ExecutionContext,Future}objectGreeterServer{def main(args:Array[String]):Unit={// Important: enable HTTP/2 in ActorSystem's config// We do it here programmatically, but you can also set it in the application.conf
val conf =ConfigFactory.parseString("akka.http.server.enable-http2 = on").withFallback(ConfigFactory.defaultApplication())
val system =ActorSystem("HelloWorld", conf)newGreeterServer(system).run()// ActorSystem threads will keep the app alive until `system.terminate()` is called}}classGreeterServer(system:ActorSystem){def run():Future[Http.ServerBinding]={// Akka boot up codeimplicit val sys:ActorSystem= system
implicit val ec:ExecutionContext= sys.dispatcher
// Create service handlers
val service:HttpRequest=>Future[HttpResponse]=GreeterServiceHandler(newGreeterServiceImpl())// Bind service handler servers to localhost:8080/8081
val binding =Http().newServerAt("127.0.0.1",8080).bind(service)// report successful binding
binding.foreach{ binding => println(s"gRPC server bound to: ${binding.localAddress}")}
binding
}}
sourcepackage example.myapp.helloworld;import akka.actor.ActorSystem;import akka.grpc.javadsl.ServerReflection;import akka.grpc.javadsl.ServiceHandler;import akka.http.javadsl.Http;import akka.http.javadsl.ServerBinding;import akka.http.javadsl.model.HttpRequest;import akka.http.javadsl.model.HttpResponse;import akka.japi.function.Function;import akka.stream.SystemMaterializer;import akka.stream.Materializer;import com.typesafe.config.Config;import com.typesafe.config.ConfigFactory;import example.myapp.helloworld.grpc.GreeterService;import example.myapp.helloworld.grpc.GreeterServiceHandlerFactory;import java.util.Arrays;import java.util.concurrent.CompletionStage;classGreeterServer{publicstaticvoid main(String[] args)throwsException{// important to enable HTTP/2 in ActorSystem's configConfig conf =ConfigFactory.parseString("akka.http.server.enable-http2 = on").withFallback(ConfigFactory.defaultApplication());// Akka ActorSystem BootActorSystem sys =ActorSystem.create("HelloWorld", conf);
run(sys).thenAccept(binding ->{System.out.println("gRPC server bound to: "+ binding.localAddress());});// ActorSystem threads will keep the app alive until `system.terminate()` is called}publicstaticCompletionStage<ServerBinding> run(ActorSystem sys)throwsException{Materializer mat =SystemMaterializer.get(sys).materializer();// Instantiate implementationGreeterService impl =newGreeterServiceImpl(mat);// Create the reflection handler for multiple servicesFunction<HttpRequest,CompletionStage<HttpResponse>> reflectionPartial =ServerReflection.create(Arrays.asList(GreeterService.description), sys);Function<HttpRequest,CompletionStage<HttpResponse>> serviceHandlers =ServiceHandler.concatOrNotFound(GreeterServiceHandlerFactory.create(impl, sys), reflectionPartial);returnHttp.get(sys).newServerAt("127.0.0.1",8090).bind(serviceHandlers);}}
Note
It’s important to enable HTTP/2 in Akka HTTP in the configuration of the ActorSystem by setting
akka.http.server.enable-http2 = on
In the example this was done from the main method, but you could also do this from within your application.conf.
The above example does not use TLS. Find more about how to Serve gRPC over TLS on the deployment section.
Serving multiple services
When a server handles several services the handlers must be combined with akka.grpc.scaladsl.ServiceHandler.concatOrNotFoundakka.grpc.javadsl.ServiceHandler.concatOrNotFound:
sourceimport akka.grpc.scaladsl.ServiceHandler// explicit types not needed but included in example for clarity
val greeterService:PartialFunction[HttpRequest,Future[HttpResponse]]=
example.myapp.helloworld.grpc.GreeterServiceHandler.partial(newGreeterServiceImpl())
val echoService:PartialFunction[HttpRequest,Future[HttpResponse]]=EchoServiceHandler.partial(newEchoServiceImpl)
val reflectionService =ServerReflection.partial(List(GreeterService,EchoService))
val serviceHandlers:HttpRequest=>Future[HttpResponse]=ServiceHandler.concatOrNotFound(greeterService, echoService, reflectionService)Http().newServerAt("127.0.0.1",8080).bind(serviceHandlers)
Note that GreeterServiceHandler.partial and EchoServiceHandler.partial are used instead of apply methods to create partial functions that are combined by concatOrNotFound.
Running the server
See the detailed chapters on sbt, Gradle and Maven for details on adding the agent.
Stateful services
More often than not, the whole point of the implementing a service is to keep state. Since the service implementation is shared between concurrent incoming requests any state must be thread safe.
There are two recommended ways to deal with this:
Put the mutable state inside an actor and interact with it through ask from unary methods or Flow.ask from streams.
Keep the state in a thread-safe place. For example, a CRUD application that is backed by a database is thread-safe when access to the backing database is (which until recently was THE way that applications dealt with request concurrency).
This is an example based on the Hello World above, but allowing users to change the greeting through a unary call: