【AWS】DynamoDBを理解する【クエリ編】

 

DynamoDBからデータを読み書きする

  • LambdaからDynamoDBへアクセスしてデータの読み書きを行う

  • Lambdaはpythonで実装

  • API Gateway→Lambda→DynamoDBの処理を想定

     

 

f:id:kanmi-program:20201029213504p:plain

 

 

<参考>

DynamoDB boto3 API

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#client

 

読み込み

プライマリキーでデータを取得

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   orderId = event['queryStringParameters']['orderId']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.get_item(Key={'orderId' : orderId})
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfo = queryData['Item']
   
   if orderInfo is None:
       return {
           'statusCode': 404,
           'body': json.dumps('Order Not Found.')
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

強力な整合性のある読み込みをする場合は「ConsistentRead=true」を設定

queryData = table.get_item(
   Key={'Id' : userId},
   ConsistentRead=True
)

 

プライマリキーとソートキーでデータを取得

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   name = event['queryStringParameters']['name']
   # 属性「order」がソートキーの場合
   order = event['queryStringParameters']['order']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       # プライマリキー、ソートキーを指定する
       queryData = table.get_item(Key={'name' : name, 'order': order})
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfo = queryData['Item']
   
   if orderInfo is None:
       return {
           'statusCode': 404,
           'body': json.dumps('Order Not Found.')
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

 

セカンダリインデックスでデータを取得

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   order = event['queryStringParameters']['order']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.query(
           IndexName='order-index',
           KeyConditionExpression=Key('order').eq(order)
      )
       
   except Exception as e:
       return {
       # エラーをCloudWatchに出力
     logger.error(e)
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfoList = queryData['Items']
   
   if not orderInfoList:
       return {
           'statusCode': 404,
           'body': json.dumps('Users Not Found.')
      }
   
   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfoList)
  }

 

全件取得

最大1MBのデータを取得可能。

それ以上は再度取得処理を実行する。

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.scan()
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfo = queryData['Items']

   logger.info(orderInfo)

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

データが上限を超える場合「LastEvaluatedKey」を使って再度取得する。

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.scan()

       # 検索結果を取得
       orderInfo = queryData['Items']

       # 取得上限を超えた場合、再度クエリを実行
       while 'LastEvaluatedKey' in queryData:
           queryData = table.scan(
               ExclusiveStartKey = queryData['LastEvaluatedKey']
          )
           # 検索結果をマージ
           orderInfo += queryData['Items']
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

取得したデータを絞り込む

指定したキーでデータを取得した後に絞り込むため、

DynamoDBからの転送量やキャパシティユニットの使用量は減らない。

scanを使用した場合もscan結果に対してFilter処理を実行するため、最大1MBの制限は変わらない。

import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   order = event['queryStringParameters']['order']
   date = event['queryStringParameters']['date']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.query(
           IndexName='order-index',
           KeyConditionExpression=Key('order').eq(order),
           FilterExpression=Attr('date').eq(date)
      )
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfoList = queryData['Items']
   
   if not orderInfoList:
       return {
           'statusCode': 404,
           'body': json.dumps('Order Not Found.')
      }
   
   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfoList)
  }

 

書き込み

プライマリキーのデータが存在しなかったら追加、すでにある場合は更新。

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストボディから登録データを取得
   body = json.loads(event['body'])
   orderId = body['orderId']
   name = body['name']
   order = body['order']
   date = body['date']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       table.put_item(
Item={
               'orderId': orderId,
               'name': name,
               'order': order,
               'date': date
          }
      )
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps('Internal Server Error.')
      }
   

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps('OK.')
  }

 

データの登録イメージ

 

f:id:kanmi-program:20201029213500p:plain

 

複数の書き込み

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
import random

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
   # リクエストボディから登録データを取得
   body = json.loads(event['body'])
   name = body['name']
   order = body['order']
   date = body['date']

   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')

   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')

   try:
       # DynamoDBへクエリ実行
       with table.batch_writer() as batch:
           for i in range(10):
               batch.put_item(
                   Item={
                       # とりあえずIDは乱数で生成して0埋め
                       'orderId': str(random.randint(1, 9999)).zfill(4),
                       'name': name,
                       'order': order,
                       'date': date
                  }
              )

   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps('Internal Server Error.')
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': {"Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps('OK.')
  }