Kim Rudolph

Spring Boot Meets Akka

Reading something about Java and the Actor model will sooner or later lead to Akka. There is a great documentation for the Scala and Java API to get started with the Akka toolkit. This application is an experiment to create a small example of the Java API in combination with the Spring Framework.

The application should spawn some actors to asynchronously write several messages in a database and shut down after all messages are processed.

Spring Boot is used to simplify the test application configuration, including packaging and startup/shutdown behaviour. See the repository for class imports and more code comments.

Enable Spring Configuration

Akka needs an extension to provide a Java API for actors created in the Spring context. A SpringActorProducer defines how to instantiate an actor in the dependency injection framework.

public class SpringActorProducer implements IndirectActorProducer {

    private final ApplicationContext applicationContext;
    private final String actorBeanName;

    public SpringActorProducer(ApplicationContext applicationContext,
        String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
    }

    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName);
    }

    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}

The producer is invoked by the SpringExtension, which integrates the Spring ApplicationContext in the Akka ActorSystem.

@Component
public class SpringExtension implements Extension {

    private ApplicationContext applicationContext;

    public void initialize(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public Props props(String actorBeanName) {
        return Props.create(SpringActorProducer.class,
            applicationContext, actorBeanName);
    }
}

Sticking it all together, an application @Configuration class enables the access on the Akka ActorSystem with the registered SpringExtension accessible via ApplicationContext. The default Akka configuration is read from the src/main/resources/application.conf path. An in-memory database connection is configured as a persistence layer.

@Configuration
@Lazy
@ComponentScan(basePackages = { "de.kimrudolph.akkaflow.services",
    "de.kimrudolph.akkaflow.actors", "de.kimrudolph.akkaflow.extension" })
public class ApplicationConfiguration {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private SpringExtension springExtension;

    @Bean
    public ActorSystem actorSystem() {

        ActorSystem system = ActorSystem
            .create("AkkaTaskProcessing", akkaConfiguration());
        springExtension.initialize(applicationContext);
        return system;
    }

    @Bean
    public Config akkaConfiguration() {
        return ConfigFactory.load();
    }

    @Bean
    public JdbcTemplate jdbcTemplate() throws Exception {
        // Simple connection pooling setup. See source code for details.
        ...
    }
}

The Task for Akka

For the test use case a simple Task bean with a payload and a priority is used. The payload is used to actually transfer something and the priority allows a forced order of messages.

public class Task {

    private String payload;

    private Integer priority;

    // Getter and Setter...
}

All tasks will be persisted in an in-memory database.

@Repository
public class TaskDAO {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    public long createTask(final Task task) {

        KeyHolder holder = new GeneratedKeyHolder();

        jdbcTemplate.update(new PreparedStatementCreator() {

            @Override
            public PreparedStatement createPreparedStatement(
                Connection connection) throws SQLException {
                PreparedStatement ps = connection
                    .prepareStatement("INSERT INTO tasks (payload, updated" +
                        ") VALUES(?, NOW())",
                        Statement.RETURN_GENERATED_KEYS);
                ps.setString(1, task.getPayload());
                return ps;
            }
        }, holder);

        return holder.getKey().longValue();
    }
}

Define Actors

One TaskActor only handles Task objects and just forwards those to the DAO. The actor is defined in a prototype scope to prevent Spring from using it as a singleton.

@Component
@Scope("prototype")
public class TaskActor extends UntypedActor {

    private final LoggingAdapter log = Logging
        .getLogger(getContext().system(), "TaskProcessor");

    @Autowired
    private TaskDAO taskDAO;

    @Override
    public void onReceive(Object message) throws Exception {

        Long result = taskDAO.createTask((Task) message);
        log.debug("Created task {}", result);
    }
}

TaskActor instances are managed by an actor called Supervisor. It defines a Router which is prefilled with several TaskActors as Routees. The used SmallestMailboxRoutingLogic works with the rules

  1. pick any idle routee (not processing message) with empty mailbox
  2. pick any routee with empty mailbox
  3. pick routee with fewest pending messages in mailbox
  4. pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown

to handle messages for TaskActors. If one of those Routees fails with a Terminated message, a new instance is added to the router resources.

@Component
@Scope("prototype")
public class Supervisor extends UntypedActor {

    private final LoggingAdapter log = Logging
        .getLogger(getContext().system(), "Supervisor");

    @Autowired
    private SpringExtension springExtension;

    private Router router;

    @Override
    public void preStart() throws Exception {

        log.info("Starting up");

        List<Routee> routees = new ArrayList<Routee>();
        for (int i = 0; i < 100; i++) {
            ActorRef actor = getContext().actorOf(springExtension.props(
                "taskActor"));
            getContext().watch(actor);
            routees.add(new ActorRefRoutee(actor));
        }
        router = new Router(new SmallestMailboxRoutingLogic(), routees);
        super.preStart();
    }

    @Override
    public void onReceive(Object message) throws Exception {

        if (message instanceof Task) {
            router.route(message, getSender());
        } else if (message instanceof Terminated) {
            // Readd task actors if one failed
            router = router.removeRoutee(((Terminated) message).actor());
            ActorRef actor = getContext().actorOf(springExtension.props
                ("taskActor"));
            getContext().watch(actor);
            router = router.addRoutee(new ActorRefRoutee(actor));
        } else {
            log.error("Unable to handle message {}", message);
        }
    }

    @Override
    public void postStop() throws Exception {
        log.info("Shutting down");
        super.postStop();
    }
}

Custom mailbox

As all Task and/or other messages should be processed with a configurable priority, a custom mailbox based on an UnboundedPriorityMailbox defines the queue order. Unknown message objects default to the value 100, in this example the lowest priority.

public class PriorityMailbox extends UnboundedPriorityMailbox {

    public PriorityMailbox(ActorSystem.Settings settings, Config config) {

        // Create a new PriorityGenerator, lower priority means more important
        super(new PriorityGenerator() {

            @Override
            public int gen(Object message) {
                if (message instanceof Task) {
                    return ((Task) message).getPriority();
                } else {
                    // default
                    return 100;
                }
            }
        });

    }
}

Main Application

The actual application behaviour is defined in the AkkaApplication main class. Spring Boot manages the startup and context including the ActorSystem as an Akka interface. An instance of the Supervisor actor is created with the SpringActorProducer and the custom PriorityMailbox mailbox. Several (increase number to test the scalability) Tasks with random priorities are created and send to the supervisor actor. After that a final PoisonPill message is queued with the lowest default priority to trigger the shutdown of the Supervisor actor. Finally the ActorSystem can safely shut down.

@Configuration
@EnableAutoConfiguration
@ComponentScan("de.kimrudolph.akkaflow.configuration")
public class AkkaApplication {

    public static void main(String[] args) throws Exception {

        ApplicationContext context =
            SpringApplication.run(AkkaApplication.class, args);

        ActorSystem system = context.getBean(ActorSystem.class);

        final LoggingAdapter log = Logging.getLogger(system, "Application");

        log.info("Starting up");

        SpringExtension ext = context.getBean(SpringExtension.class);

        // Use the Spring Extension to create props for a named actor bean
        ActorRef supervisor = system.actorOf(
            ext.props("supervisor").withMailbox("akka.priority-mailbox"));

        for (int i = 1; i < 1000; i++) {
            Task task = new Task();
            task.setPayload("payload " + i);
            task.setPriority(new Random().nextInt(99));
            supervisor.tell(task, null);
        }

        // Poison pill will be queued with a priority of 100 as the last
        // message
        supervisor.tell(PoisonPill.getInstance(), null);

        while (!supervisor.isTerminated()) {
            Thread.sleep(100);
        }

        log.info("Created {} tasks", context.getBean(JdbcTemplate.class)
            .queryForObject("SELECT COUNT(*) FROM tasks", Integer.class));

        log.info("Shutting down");

        system.shutdown();
        system.awaitTermination();
    }
}

A main class has to be specified to use it on Java Archive execution.

pom.xml

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-maven-plugin</artifactId>
      <version>1.1.1.RELEASE</version>
      <configuration>
        <mainClass>de.kimrudolph.akkaflow.AkkaApplication</mainClass>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>repackage</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

Akka Configuration

The Akka configuration for this example defines which logging framework should be used and that there is the custom priority-mailbox available.

src/main/resources/application.conf

akka {

  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  stdout-loglevel = "ERROR"

  priority-mailbox {
    mailbox-type = "de.kimrudolph.akkaflow.mailbox.PriorityMailbox"
  }

}

Starting the Application

The mvn package goal builds the jar file and then the application is executable with java -jar target/akkaflow-1.0-SNAPSHOT.jar.

The console should print something like:

...
INFO  Application - Starting up
INFO  Supervisor - Starting up
INFO  Supervisor - Shutting down
INFO  Application - Created 999 tasks
INFO  Application - Shutting down
...

Sourcecode

The full application can be found at the akkaflow repository.