【AWS】DynamoDBを理解する【クエリ編】
<参考>
DynamoDB boto3 API
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#client
読み込み
プライマリキーでデータを取得
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# リクエストから検索条件を取得
orderId = event['queryStringParameters']['orderId']
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
queryData = table.get_item(Key={'orderId' : orderId})
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps("Internal Server Error.")
}
# 検索結果を取得
orderInfo = queryData['Item']
if orderInfo is None:
return {
'statusCode': 404,
'body': json.dumps('Order Not Found.')
}
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps(orderInfo)
}
強力な整合性のある読み込みをする場合は「ConsistentRead=true」を設定
queryData = table.get_item(
Key={'Id' : userId},
ConsistentRead=True
)
プライマリキーとソートキーでデータを取得
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# リクエストから検索条件を取得
name = event['queryStringParameters']['name']
# 属性「order」がソートキーの場合
order = event['queryStringParameters']['order']
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
# プライマリキー、ソートキーを指定する
queryData = table.get_item(Key={'name' : name, 'order': order})
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps("Internal Server Error.")
}
# 検索結果を取得
orderInfo = queryData['Item']
if orderInfo is None:
return {
'statusCode': 404,
'body': json.dumps('Order Not Found.')
}
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps(orderInfo)
}
セカンダリインデックスでデータを取得
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# リクエストから検索条件を取得
order = event['queryStringParameters']['order']
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
queryData = table.query(
IndexName='order-index',
KeyConditionExpression=Key('order').eq(order)
)
except Exception as e:
return {
# エラーをCloudWatchに出力
logger.error(e)
'statusCode': 500,
'body': json.dumps("Internal Server Error.")
}
# 検索結果を取得
orderInfoList = queryData['Items']
if not orderInfoList:
return {
'statusCode': 404,
'body': json.dumps('Users Not Found.')
}
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps(orderInfoList)
}
全件取得
最大1MBのデータを取得可能。
それ以上は再度取得処理を実行する。
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
queryData = table.scan()
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps("Internal Server Error.")
}
# 検索結果を取得
orderInfo = queryData['Items']
logger.info(orderInfo)
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps(orderInfo)
}
データが上限を超える場合「LastEvaluatedKey」を使って再度取得する。
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
queryData = table.scan()
# 検索結果を取得
orderInfo = queryData['Items']
# 取得上限を超えた場合、再度クエリを実行
while 'LastEvaluatedKey' in queryData:
queryData = table.scan(
ExclusiveStartKey = queryData['LastEvaluatedKey']
)
# 検索結果をマージ
orderInfo += queryData['Items']
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps("Internal Server Error.")
}
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps(orderInfo)
}
取得したデータを絞り込む
指定したキーでデータを取得した後に絞り込むため、
DynamoDBからの転送量やキャパシティユニットの使用量は減らない。
scanを使用した場合もscan結果に対してFilter処理を実行するため、最大1MBの制限は変わらない。
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# リクエストから検索条件を取得
order = event['queryStringParameters']['order']
date = event['queryStringParameters']['date']
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
queryData = table.query(
IndexName='order-index',
KeyConditionExpression=Key('order').eq(order),
FilterExpression=Attr('date').eq(date)
)
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps("Internal Server Error.")
}
# 検索結果を取得
orderInfoList = queryData['Items']
if not orderInfoList:
return {
'statusCode': 404,
'body': json.dumps('Order Not Found.')
}
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps(orderInfoList)
}
書き込み
プライマリキーのデータが存在しなかったら追加、すでにある場合は更新。
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# リクエストボディから登録データを取得
body = json.loads(event['body'])
orderId = body['orderId']
name = body['name']
order = body['order']
date = body['date']
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
table.put_item(
Item={
'orderId': orderId,
'name': name,
'order': order,
'date': date
}
)
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps('Internal Server Error.')
}
# 正常終了
return {
'statusCode': 200,
'headers': { "Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps('OK.')
}
データの登録イメージ
複数の書き込み
import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
import random
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
# リクエストボディから登録データを取得
body = json.loads(event['body'])
name = body['name']
order = body['order']
date = body['date']
# DynamoDBを取得
dynamoDB = boto3.resource('dynamodb')
# DynamoDBのテーブル名
table = dynamoDB.Table('History')
try:
# DynamoDBへクエリ実行
with table.batch_writer() as batch:
for i in range(10):
batch.put_item(
Item={
# とりあえずIDは乱数で生成して0埋め
'orderId': str(random.randint(1, 9999)).zfill(4),
'name': name,
'order': order,
'date': date
}
)
except Exception as e:
# エラーをCloudWatchに出力
logger.error(e)
return {
'statusCode': 500,
'body': json.dumps('Internal Server Error.')
}
# 正常終了
return {
'statusCode': 200,
'headers': {"Content-Type": "application/json; charset=UTF-8"},
'body': json.dumps('OK.')
}