【AWS】DynamoDBを理解する【クエリ編】

 

DynamoDBからデータを読み書きする

  • LambdaからDynamoDBへアクセスしてデータの読み書きを行う

  • Lambdaはpythonで実装

  • API Gateway→Lambda→DynamoDBの処理を想定

     

 

f:id:kanmi-program:20201029213504p:plain

 

 

<参考>

DynamoDB boto3 API

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#client

 

読み込み

プライマリキーでデータを取得

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   orderId = event['queryStringParameters']['orderId']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.get_item(Key={'orderId' : orderId})
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfo = queryData['Item']
   
   if orderInfo is None:
       return {
           'statusCode': 404,
           'body': json.dumps('Order Not Found.')
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

強力な整合性のある読み込みをする場合は「ConsistentRead=true」を設定

queryData = table.get_item(
   Key={'Id' : userId},
   ConsistentRead=True
)

 

プライマリキーとソートキーでデータを取得

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   name = event['queryStringParameters']['name']
   # 属性「order」がソートキーの場合
   order = event['queryStringParameters']['order']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       # プライマリキー、ソートキーを指定する
       queryData = table.get_item(Key={'name' : name, 'order': order})
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfo = queryData['Item']
   
   if orderInfo is None:
       return {
           'statusCode': 404,
           'body': json.dumps('Order Not Found.')
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

 

セカンダリインデックスでデータを取得

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   order = event['queryStringParameters']['order']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.query(
           IndexName='order-index',
           KeyConditionExpression=Key('order').eq(order)
      )
       
   except Exception as e:
       return {
       # エラーをCloudWatchに出力
     logger.error(e)
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfoList = queryData['Items']
   
   if not orderInfoList:
       return {
           'statusCode': 404,
           'body': json.dumps('Users Not Found.')
      }
   
   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfoList)
  }

 

全件取得

最大1MBのデータを取得可能。

それ以上は再度取得処理を実行する。

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.scan()
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfo = queryData['Items']

   logger.info(orderInfo)

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

データが上限を超える場合「LastEvaluatedKey」を使って再度取得する。

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.scan()

       # 検索結果を取得
       orderInfo = queryData['Items']

       # 取得上限を超えた場合、再度クエリを実行
       while 'LastEvaluatedKey' in queryData:
           queryData = table.scan(
               ExclusiveStartKey = queryData['LastEvaluatedKey']
          )
           # 検索結果をマージ
           orderInfo += queryData['Items']
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfo)
  }

 

取得したデータを絞り込む

指定したキーでデータを取得した後に絞り込むため、

DynamoDBからの転送量やキャパシティユニットの使用量は減らない。

scanを使用した場合もscan結果に対してFilter処理を実行するため、最大1MBの制限は変わらない。

import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストから検索条件を取得
   order = event['queryStringParameters']['order']
   date = event['queryStringParameters']['date']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       queryData = table.query(
           IndexName='order-index',
           KeyConditionExpression=Key('order').eq(order),
           FilterExpression=Attr('date').eq(date)
      )
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps("Internal Server Error.")
      }
   
   # 検索結果を取得
   orderInfoList = queryData['Items']
   
   if not orderInfoList:
       return {
           'statusCode': 404,
           'body': json.dumps('Order Not Found.')
      }
   
   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps(orderInfoList)
  }

 

書き込み

プライマリキーのデータが存在しなかったら追加、すでにある場合は更新。

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
   # リクエストボディから登録データを取得
   body = json.loads(event['body'])
   orderId = body['orderId']
   name = body['name']
   order = body['order']
   date = body['date']
   
   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')
   
   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')
   
   try:
       # DynamoDBへクエリ実行
       table.put_item(
Item={
               'orderId': orderId,
               'name': name,
               'order': order,
               'date': date
          }
      )
       
   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps('Internal Server Error.')
      }
   

   # 正常終了
   return {
       'statusCode': 200,
       'headers': { "Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps('OK.')
  }

 

データの登録イメージ

 

f:id:kanmi-program:20201029213500p:plain

 

複数の書き込み

import json
import boto3
from boto3.dynamodb.conditions import Key
import logging
import random

# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
   # リクエストボディから登録データを取得
   body = json.loads(event['body'])
   name = body['name']
   order = body['order']
   date = body['date']

   # DynamoDBを取得
   dynamoDB = boto3.resource('dynamodb')

   # DynamoDBのテーブル名
   table = dynamoDB.Table('History')

   try:
       # DynamoDBへクエリ実行
       with table.batch_writer() as batch:
           for i in range(10):
               batch.put_item(
                   Item={
                       # とりあえずIDは乱数で生成して0埋め
                       'orderId': str(random.randint(1, 9999)).zfill(4),
                       'name': name,
                       'order': order,
                       'date': date
                  }
              )

   except Exception as e:
       # エラーをCloudWatchに出力
       logger.error(e)
       return {
           'statusCode': 500,
           'body': json.dumps('Internal Server Error.')
      }

   # 正常終了
   return {
       'statusCode': 200,
       'headers': {"Content-Type": "application/json; charset=UTF-8"},
       'body': json.dumps('OK.')
  }

【AWS】DynamoDBを理解する【概要編】

 

 

DynamoDB?

  • NoSQLデータベース

  • 結果整合性のある読み込み

    書き込みの保障はあるが、読み込みの保障はない。

 (書き込まれたデータが確実に取得できるとは限らない)

  • RDSとかと違って検索機能が弱い

  • 容量無制限

  • 高い可用性と耐久性

  AWSドキュメントに以下の記載あり。

DynamoDB は十分な数のサーバー間でデータとトラフィックを自動的に分散し、一貫した高速なパフォーマンスを維持したまま、スループットとストレージの要件に対応します。すべてのデータは SSD (Solid State Disk) に保存され、1 つの AWS リージョン内の複数のアベイラビリティーゾーン間で自動的にレプリケートすることによって、高い可用性とデータ堅牢性を実現します。

 

f:id:kanmi-program:20201029000111p:plain

 

<参考>

Amazon DynamoDB とは

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Introduction.html

 

 

料金

オンデマンドモードとプロビジョンドモードによって違いあり。

 

プロビジョンドモード

課金対象一覧

<参考>

●プロビジョニングされたキャパシティーの料金

https://aws.amazon.com/jp/dynamodb/pricing/provisioned/

機能   内容 請求単位
主要機能      
  プロビジョニングされた書き込みキャパシティー テーブルにデータを書き込む WCU
  プロビジョニングされた読み込みキャパシティー テーブルからデータを読み込む RCU
  データストレージ データを格納する (インデックス値を含む) GB 月単位
オプション機能      
  継続的バックアップ 過去 35 日間のバックアップを継続的に取得する GB 月単位
  オンデマンドバックアップ 過去のある時点におけるスナップショットバックアップを取得する GB 月単位
  バックアップからの復元 特定のスナップショットまたは時間のテーブルを復元する GB 単位
  グローバルテーブル データをレプリケートしてマルチリージョンかつマルチマスターなテーブルを作成する rWCU
  DynamoDB Accelerator (DAX) インメモリキャッシュのレイテンシーを低減する ノード時間単位
  DynamoDB ストリーム テーブルの項目レベルの変更に関する時系列シーケンスを提供する ストリーム読み込みリクエスト単位
  データ転送 (アウト) 他の AWS リージョンにデータを転送する 同一リージョン内なら無料 GB 単位

 

課金対象詳細

※2020/10時点の東京リージョンで計算

 

プロビジョニングされた書き込みキャパシティー

  • WCUの割り当て数によって課金

  • 1WCU/1時間あたり 0.000742USD≒0.078円

  • 無料利用枠で月あたり25WCUまで無料

 

例)テーブルにプロビジョニングされたWCUの合計が30WCUの場合

  • 1カ月当たりのWCU単価

    1WCU/月=0.53424USD(24時間×30日で計算)

  • 1カ月当たりのWCU割り当て数 - 無料利用枠

    30WCU/月 - 25WCU/月 = 5WCU

  • 日本円に変換

    5WCU/月=2.6712USD≒278.48 円

 

プロビジョニングされた読み込みキャパシティー

  • RCUの割り当て数によって課金

  • 1RCU/1時間あたり 0.0001484USD≒0.016円

  • 無料利用枠で月あたり25RCUまで無料

 

例)テーブルにプロビジョニングされたRCUの合計が30RCUの場合

  • 1カ月当たりのRCU単価

    1RCU/月=0.106848USD(24時間×30日で計算)

  • 1カ月当たりのWCU割り当て数 - 無料利用枠

    30WCU/月 - 25WCU/月 = 5WCU

  • 日本円に変換

    5WCU/月=0.53424USD≒55.71 円

 

 

データストレージ

  • 下記の合計が課金対象

    • アップロードするデータの raw バイトサイズ

    • 各項目のストレージオーバーヘッドとしてインデックス作成のための 100 バイト

  • 月/GBあたり0.285USD≒29.71 円

  • 無料利用枠で25 GBまで無料

 

オプション

あとで書く

 

オンデマンド

あとで書く

 

無料利用枠

キャパシティモードによって違いあり。

 

オンデマンド

  • 25 GB のデータストレージ

  • DynamoDB ストリームからのストリーム読み込みリクエスト 250 万回

  • AWS のサービス全体での合計データ転送 (アウト) 1 GB

  • 読み込み、書き込みリクエストは無料利用枠の対象外

 

プロビジョンド

  • WCU 25 個と RCU 25 個のプロビジョニングされたキャパシティー

  • 25 GB のデータストレージ

  • 2 つの AWS リージョンにデプロイされたグローバルテーブルに対して rWCU 25 個

  • DynamoDB ストリームからの 250 万回のストリーム読み込み要求

  • AWS のすべてのサービスで合計して 1 GB までのデータ転送 (アウト) (最初の 12 か月で 15 GB)

 

 

言語集

 

項目

SQLのレコードと一緒

 

属性

SQLのカラムとか列とかと一緒

 

キャパシティモード

以下の2種類あり。

 

オンデマンド

  • 事前の容量割り当て無し

    • 使用した分だけ課金が発生

  • 1秒あたり数千リクエストを処理可能

  • トラフィック量に合わせて柔軟に対応

 

プロビジョニング

  • 事前にキャパシティを割り当て(WCU/RCU)

    • 割り当てたキャパシティ数によって課金が発生

  • スループットは割り当てたキャパシティによって変化

  • オプションのAuto Scalingを設定することでトラフィックに応じて自動でキャパシティ数を増加

 

 

プライマリキー

  • テーブルの各項目を一意に識別するもの

  • プライマリキー属性に設定可能なデータ型はString、Number、Binary

以下の2種類あり。

 

パーティションキー

  • 1つの属性で構成されるシンプルなプライマリキー

 

パーティションキーとソートキー

  • 複合主キーと呼ばれるもの

  • パーティションキーとソートキーで構成される

  • テーブル内で同一パーティションキーを持つことができるが、その場合はソートキーは異なる必要あり

 

 

セカンダリインデックス

  • プライマリキーとは別に検索等に使用可能な代替キー

  • セカンダリインデックスを使用して取得するデータは事前に射影した属性を取得する

以下の2種類あり。

 

グローバルセカンダリインデックス(GSI)

  • テーブル上のものとは異なるパーティションキーとソートキーを持つインデックス

  • インデックスにはプライマリキーは必須で、ソートキーは必須ではない

  • テーブルにプロビジョニングされたキャパシティとは別にキャパシティを使用する

 

ローカルセカンダリインデックス(LSI

  • テーブルと同じパーティションキーを持つが、異なるソートキーを持つインデックス

  • テーブル作成時にしか追加できない

  • インデックスにはパーティションキーとソートキーが必須

  • テーブルにプロビジョニングされたユニットを使用する

 

 

射影

  • セカンダリインデックスに使用するテーブル

    • セカンダリインデックスで検索して取得する属性を選択

    • 選択した属性分だけ別途ストレージを使用

 

 

読み込み方式

データ読み込み方式は下記の2種類あり。

 

結果整合性のある読み込み

DynamoDB テーブルからの読み込みオペレーションの応答には、最近の書き込みオペレーションの結果が反映されていないことがあります。応答には古いデータが含まれる場合があります。少し時間がたってから読み込みリクエストを繰り返すと、応答で最新のデータが返されます。

 

強力な整合性のある読み込み

強力な整合性のある読み込みをリクエストすると、DynamoDB は成功した以前のすべての書き込みオペレーションからの更新が反映された最新データの応答を返します。ただし、この整合性には以下のような欠点があります。

  • ネットワーク遅延や障害が発生した場合、協力な整合性のある読み込みができない時がある

    この場合、DynamoDBはサーバーエラー(HTTP 500)を返却する

  • 結果整合性のある読み込みよりも待ち時間が長くなる可能性がある

  • グローバルセカンダリインデックスではサポートされていない(2020/10時点)

  • 結果整合性のある読み込みよりも2倍キャパシティユニットを使用する

 

<参考>

Read Consistency

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html

 

Read/Write Capacity Mode

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html

 

 

リクエストユニット

データの読み込み、書き込みの単位。

 

読込ユニット(RCU)

  • 1RCUは1秒間に1回の読み込みを行う

  • 1RCUのサイズは4KB

  • 強く一貫性のある読み取りの場合は1RCUを使用

  • 結果一貫性のある読み取りの場合は0.5RCUを使用

 

書き込みユニット(WCU)

  • 1WCUは1秒間に1回の書き込みを行う

  • 1WCUのサイズは1KB

  • 通常の書き込みの場合は1WCUを使用

  • トランザクション書き込みの場合は2WCUを使用

 

Auto Scaling

 

<参考>

DynamoDB Auto Scaling によるスループットキャパシティーの自動管理

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/AutoScaling.html

 

 

制限事項

<参考>

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/Limits.html