日頃の行い

個人的な日頃の行いをつらつら書いてます\\\\ ٩( 'ω' )و ////

localstack上でSQS->Lambda(Scala製)の連携を動かしてみる

開発時にlocalstackをAmazon Resourceのモックとして使っていて、
前試してなんか動かなかったんだけど、ちゃんとやったら動いたのでそのメモです。

github.com

localstack上のSQSにメッセージを送ったら、localstack上でmappingされたScala製のLambdaがキックされる様子を観測するのがゴールです。
検証用に使ったコードはこちらです。

github.com

登場するコマンドたちのversion

$aws --version
aws-cli/1.16.260 Python/3.7.4 Darwin/18.7.0 botocore/1.12.250

$docker-compose --version
docker-compose version 1.24.1, build 4667896b

1. localstackの起動

docker-composeを利用しつつlocalstackのSQSとLambdaを起動します。
利用したlocalstackのversionは 0.10.6 です。
networksは内部で別のコンテナと通信したいときに指定するために使います。
(実は今回は使ってないですごめんね)

docker-compose.yml

version: "2"

services:
    localstack:
        image: localstack/localstack:0.10.6
        ports:
          - "4567-4597:4567-4597"
        environment:
            LAMBDA_EXECUTOR: docker
            DOCKER_HOST: unix:///var/run/docker.sock
            DATA_DIR: /tmp/localstack/data
            SERVICES: sqs,lambda
            DEBUG: 1
            # 内部APIを叩きたいときなどに使う
            LAMBDA_DOCKER_NETWORK: localstack-sqs-lambda_foo_network
        volumes:
          - "/var/run/docker.sock:/var/run/docker.sock"
        networks:
            foo_network:

networks:
    foo_network:

2. Lambda Handlerの作成

特に強い意図はないのですが仕事で触れていたのがScalaだったのでScalaで書いています。
HelloWorldとメッセージに含まれるBodyを表示する感じです。

scala/src/main/scala/com/ru/waka/FooHandler.scala

package com.ru.waka

import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import com.amazonaws.services.lambda.runtime.events.SQSEvent
import scala.collection.JavaConverters._

class FooHandler extends RequestHandler[SQSEvent, Unit] {
  override def handleRequest(input: SQSEvent, context: Context): Unit = {
    context.getLogger.log("Hello World from Scala Code\n")
    context.getLogger.log("Message's body is...\n")
    input.getRecords.asScala.foreach(r => {
      context.getLogger.log(s"${r.getBody}")
    })
  }
}

Lambdaにわたす際のビルドにはsbt-assemblyを利用しています。

github.com

ビルドの様子

$make -C scala build
# Lambdaのコードビルド
make[1]: Entering directory '/Users/arata/.ghq/github.com/ara-ta3/localstack-sqs-lambda/scala'
./tools/sbt/bin/sbt assembly
[info] Loading settings for project scala-build from build.sbt ...
[info] Loading project definition from /Users/arata/.ghq/github.com/ara-ta3/localstack-sqs-lambda/scala/project
[info] Loading settings for project scala from build.sbt ...
[info] Set current project to scala (in build file:/Users/arata/.ghq/github.com/ara-ta3/localstack-sqs-lambda/scala/)
[info] Strategy 'discard' was applied to 32 files (Run the task at debug level to see details)
[info] Strategy 'filterDistinctLines' was applied to a file (Run the task at debug level to see details)
[info] Assembly up to date: /Users/arata/.ghq/github.com/ara-ta3/localstack-sqs-lambda/scala/target/scala-2.12/scala-assembly-0.1.0-SNAPSHOT.jar
[success] Total time: 1 s, completed 2019/12/20 22:50:41
make[1]: Leaving directory '/Users/arata/.ghq/github.com/ara-ta3/localstack-sqs-lambda/scala'

3. SQSとLambdaの作成

次にlocalstack上にSQSとLambda、そしてそのマッピングを作成します。
Lambda作成時には 2. でビルドした成果物のjarファイルを上げています。

# SQSの作成
$aws --endpoint-url http://localhost:4576 \
    sqs create-queue --queue-name 'foo-queue'
{
    "QueueUrl": "http://localhost:4576/queue/foo-queue"
}

# Lambdaの作成
$aws --endpoint-url http://localhost:4574 \
    lambda create-function \
    --function-name 'foo-function' \
    --runtime=java8 \
    --role=dummyrole \
    --handler=com.ru.waka.FooHandler \
    --zip-file=fileb://./scala/target/scala-2.12/scala-assembly-0.1.0-SNAPSHOT.jar
{
    "FunctionName": "foo-function",
    "FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:foo-function",
    "Runtime": "java8",
    "Role": "dummyrole",
    "Handler": "com.ru.waka.FooHandler",
    "CodeSize": 10857096,
    "Description": "",
    "Timeout": 3,
    "LastModified": "2019-12-20T13:50:43.397+0000",
    "CodeSha256": "VJ4FGoNtdijTBO5acf/pu5WW/K/FQFytSH+x9WprcQc=",
    "Version": "$LATEST",
    "TracingConfig": {
        "Mode": "PassThrough"
    },
    "RevisionId": "fbaff8d2-6ac0-4498-879a-57b87cdd1466"
}

# SQSとLambdaのマッピング
$aws --endpoint-url http://localhost:4574 \
    lambda create-event-source-mapping \
    --event-source-arn arn:aws:sqs:us-east-1:000000000000:foo-queue  \
    --function-name "foo-function"
{
    "UUID": "b07dc12c-5a74-490b-82b4-1e43abdb6761",
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:sqs:us-east-1:000000000000:foo-queue",
    "FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:foo-function",
    "LastModified": 1576849845.0,
    "LastProcessingResult": "OK",
    "State": "Enabled",
    "StateTransitionReason": "User action"
}

これでSQSにメッセージを送ればLambdaがキックされてHello Worldが表示されるはずです。

4. メッセージを送る

aws cliからメッセージを送ってみました。
そうするとdocker-composeのログにINFOログが流れてきたのがわかります。

$aws --region us-east-1 --endpoint-url http://localhost:4576 \
    sqs send-message \
    --queue-url 'http://localstack:4576/queue/foo-queue' \
    --message-body 'hogehoge'
{
    "MD5OfMessageBody": "329435e5e66be809a656af105f42401e",
    "MD5OfMessageAttributes": "d41d8cd98f00b204e9800998ecf8427e",
    "MessageId": "c3aa9a0c-9c3d-4b61-a426-9d45a2b2595f"
}

# docker-composeのログ
localstack_1  | 13:50:42.627 [elasticmq-akka.actor.default-dispatcher-4] INFO  o.elasticmq.actor.QueueManagerActor - Creating queue QueueData(foo-queue,MillisVisibilityTimeout(30000),PT0S,PT0S,2019-12-20T13:50:42.597Z,2019-12-20T13:50:42.597Z,None,false,false,None,None,Map())
localstack_1  | 2019-12-20T13:53:26:DEBUG:localstack.services.awslambda.lambda_api: Found 1 source mappings for event from SQS queue arn:aws:sqs:us-east-1:000000000000:foo-queue: ['arn:aws:lambda:us-east-1:000000000000:function:foo-function']
localstack_1  | 2019-12-20T13:53:26:DEBUG:localstack.services.awslambda.lambda_executors: Running lambda cmd: CONTAINER_ID="$(docker create -i  --entrypoint ""  -e DOCKER_LAMBDA_USE_STDIN="$DOCKER_LAMBDA_USE_STDIN" -e HOSTNAME="$HOSTNAME" -e LOCALSTACK_HOSTNAME="$LOCALSTACK_HOSTNAME" -e AWS_LAMBDA_FUNCTION_NAME="$AWS_LAMBDA_FUNCTION_NAME" -e AWS_LAMBDA_FUNCTION_VERSION="$AWS_LAMBDA_FUNCTION_VERSION" -e AWS_LAMBDA_FUNCTION_INVOKED_ARN="$AWS_LAMBDA_FUNCTION_INVOKED_ARN" --network="localstack-sqs-lambda_foo_network" --rm "lambci/lambda:java8" bash -c 'cd /var/task; java  -cp ".:localstack-utils-fat.jar" "cloud.localstack.LambdaExecutor" "com.ru.waka.FooHandler" "event_file.json"')";docker cp "/tmp/localstack/zipfile.2c9c80b9/." "$CONTAINER_ID:/var/task"; docker start -ai "$CONTAINER_ID";
localstack_1  | 2019-12-20T13:53:29:DEBUG:localstack.services.awslambda.lambda_executors: Lambda arn:aws:lambda:us-east-1:000000000000:function:foo-function result / log output:
localstack_1  | ()
localstack_1  | >Dec 20, 2019 1:53:28 PM cloud.localstack.LambdaContext$1 log
localstack_1  | > INFO: Hello World from Scala Code
localstack_1  | >
localstack_1  | > Dec 20, 2019 1:53:28 PM cloud.localstack.LambdaContext$1 log
localstack_1  | > INFO: Message's body is...
localstack_1  | >
localstack_1  | > Dec 20, 2019 1:53:28 PM cloud.localstack.LambdaContext$1 log
localstack_1  | > INFO: hogehoge

ハマったところ

aws cliからメッセージを送った際、Regionが $HOME/.aws/config に記述されているデフォルトのRegionを見ていて ap-northeast-1 になっていました。
しかし、アプリケーション(Java SDK)から叩いた際にRegionを ap-northeast-1 に指定しても利用されるSQSは us-east-1 になってしまいました。
そのため、上記では us-east-1 を利用するようにしています。
他の言語のSDKがどうなっているかはわからないですが、ローカルでの開発用途になると思いますし、localstackに対してaws cliで叩くときにregionのオプションをつければいいだけになるので us-east-1 でとりあえず逃げることはできそうですね。

Java SDKから叩いた際のコード

scala/src/main/scala/com/ru/waka/SQSMessageSender.scala

package com.ru.waka

import com.amazonaws.regions.Regions
import com.amazonaws.services.sqs.AmazonSQSClientBuilder
import com.amazonaws.services.sqs.model.SendMessageRequest

object SQSMessageSender {
  def main(args: Array[String]): Unit = {
    val sqs = AmazonSQSClientBuilder
      .standard()
      .withRegion(Regions.AP_NORTHEAST_1)
      .build()
    val req = new SendMessageRequest()
        .withQueueUrl("http://localhost:4576/queue/foo-queue")
        .withMessageBody("Message from Scala Code")
    sqs.sendMessage(req)
  }
}