Akka Extensions
Loading

Akka Extensions

If you want to add features to Akka, there is a very elegant, but powerful mechanism for doing so. It's called Akka Extensions and is comprised of 2 basic components: an Extension and an ExtensionId.

Extensions will only be loaded once per ActorSystem, which will be managed by Akka. You can choose to have your Extension loaded on-demand or at ActorSystem creation time through the Akka configuration. Details on how to make that happens are below, in the "Loading from Configuration" section.

Warning

Since an extension is a way to hook into Akka itself, the implementor of the extension needs to ensure the thread safety of his/her extension.

Building an Extension

So let's create a sample extension that just lets us count the number of times something has happened.

First, we define what our Extension should do:

import akka.actor.*;
import java.util.concurrent.atomic.AtomicLong;
public class CountExtensionImpl implements Extension {
  //Since this Extension is a shared instance
  // per ActorSystem we need to be threadsafe
  private final AtomicLong counter = new AtomicLong(0);

  //This is the operation this Extension provides
  public long increment() {
    return counter.incrementAndGet();
  }
}

Then we need to create an ExtensionId for our extension so we can grab ahold of it.

import akka.actor.*;
import java.util.concurrent.atomic.AtomicLong;
public class CountExtension extends AbstractExtensionId<CountExtensionImpl>
  implements ExtensionIdProvider {
  //This will be the identifier of our CountExtension
  public final static CountExtension CountExtensionProvider = new CountExtension();

  private CountExtension() {}

  //The lookup method is required by ExtensionIdProvider,
  // so we return ourselves here, this allows us
  // to configure our extension to be loaded when
  // the ActorSystem starts up
  public CountExtension lookup() {
    return CountExtension.CountExtensionProvider; //The public static final
  }

  //This method will be called by Akka
  // to instantiate our Extension
  public CountExtensionImpl createExtension(ExtendedActorSystem system) {
    return new CountExtensionImpl();
  }
}

Wicked! Now all we need to do is to actually use it:

// typically you would use static import of the
// CountExtension.CountExtensionProvider field
CountExtension.CountExtensionProvider.get(system).increment();

Or from inside of an Akka Actor:

public class MyActor extends UntypedActor {
  public void onReceive(Object msg) {
    // typically you would use static import of the
    // CountExtension.CountExtensionProvider field
    CountExtension.CountExtensionProvider.get(getContext().system()).increment();
  }
}

That's all there is to it!

Loading from Configuration

To be able to load extensions from your Akka configuration you must add FQCNs of implementations of either ExtensionId or ExtensionIdProvider in the "akka.extensions" section of the config you provide to your ActorSystem.

akka {
  extensions = ["docs.extension.ExtensionDocTest.CountExtension"]
}

Applicability

The sky is the limit! By the way, did you know that Akka's Typed Actors, Serialization and other features are implemented as Akka Extensions?

Application specific settings

The Configuration can be used for application specific settings. A good practice is to place those settings in an Extension.

Sample configuration:

myapp {
  db {
    uri = "mongodb://example1.com:27017,example2.com:27017"
  }
  circuit-breaker {
    timeout = 30 seconds
  }
}

The Extension:

import akka.actor.Extension;
import akka.actor.AbstractExtensionId;
import akka.actor.ExtensionIdProvider;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import scala.concurrent.duration.Duration;
import com.typesafe.config.Config;
import java.util.concurrent.TimeUnit;
public class SettingsImpl implements Extension {

  public final String DB_URI;
  public final Duration CIRCUIT_BREAKER_TIMEOUT;

  public SettingsImpl(Config config) {
    DB_URI = config.getString("myapp.db.uri");
    CIRCUIT_BREAKER_TIMEOUT =
      Duration.create(config.getDuration("myapp.circuit-breaker.timeout", 
        TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
  }

}

public class Settings extends AbstractExtensionId<SettingsImpl>
  implements ExtensionIdProvider {
  public final static Settings SettingsProvider = new Settings();

  private Settings() {}

  public Settings lookup() {
    return Settings.SettingsProvider;
  }

  public SettingsImpl createExtension(ExtendedActorSystem system) {
    return new SettingsImpl(system.settings().config());
  }
}

Use it:

public class MyActor extends UntypedActor {
  // typically you would use static import of the Settings.SettingsProvider field
  final SettingsImpl settings =
    Settings.SettingsProvider.get(getContext().system());
  Connection connection =
    connect(settings.DB_URI, settings.CIRCUIT_BREAKER_TIMEOUT);

}

Contents