Skip to content

Commit

Permalink
first
Browse files Browse the repository at this point in the history
  • Loading branch information
徳住友稜 committed Dec 7, 2019
0 parents commit edf0ad3
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__pycache__
env
.mypy_cache
.vscode
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.6.8
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Qiita notification job with AWS Lambda and Dynamo DB Stream.
this repository includes two code, which apply to Lambda one by one.

## /qiita_iine_collect/check_new_iine_dev.py
- collect all articles iine by Qiita API v2
- update logs in Dynamo DB to stream differences, which is target of notification

## /qiita_notification/send_notification.py
- get stream data of Dynamo DB
- notify via LINE Notify
121 changes: 121 additions & 0 deletions qiita_iine_collect/check_new_iine_dev.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
from math import ceil
from typing import List, Dict, Any, Union, Tuple
import json
from urllib.request import Request
from urllib import request, parse, error
from http.client import HTTPResponse
import boto3
from botocore.exceptions import ClientError


class Response():
"""Http Response Object"""

def __init__(self, res: HTTPResponse):
self.body = self._json(res)
self.status_code = self._status_code(res)
self.headers = self._headers(res)

def _json(self, res: HTTPResponse):
return json.loads(res.read())

def _status_code(self, res: HTTPResponse) -> int:
return res.status

def _headers(self, res: HTTPResponse) -> Dict[str, str]:
return dict(res.getheaders())


def req_get(url, headers=None, params=None) -> Response:
"""get request. simplified request function of Requests
:return: Response object
"""
if params:
url = '{}?{}'.format(url, parse.urlencode(params))

req = Request(url, headers=headers, method='GET')

with request.urlopen(req) as res:
response = Response(res)
return response


def serialize_response(response: Response) -> List[Dict[str, Any]]:
"""serialize response of Qiita API v2
:param response:
:return:
"""
keys = ['id', 'title', 'likes_count']
return [
{f: resp.get(f) for f in keys} for resp in response.body
]


def get_item(url: str, headers: Dict[str, str], **param) -> List[Dict[str, Any]]:
"""get a item by Qiita API v2 and return the list of serialized response (dictionary)"""
response = req_get(url, headers=headers, params=param)
return serialize_response(response)


def get_items(token: str, per_page=1, url='https://qiita.com/api/v2/authenticated_user/items') -> List[Dict[str, Any]]:
"""ページネーションして認証ユーザの全ての記事を取得する
:return: 記事のリスト
"""
headers = {'Authorization': 'Bearer {}'.format(token)}

response: Response = req_get(url, headers=headers, params={'page': 1, 'per_page': per_page})
items = serialize_response(response)
tot_count = int(response.headers['Total-Count'])
tot_pages = ceil(tot_count / per_page)
if tot_pages <= 1:
return items

for page in range(2, tot_pages + 1):
items += get_item(url, headers, page=page, per_page=per_page)
return items


def update_logs(items: List[Dict[str, Any]]):
"""Update the number of iine in Dynamo DB
If item ID do not exist in Dynamo DB, insert them in it
"""
dynamodb = boto3.resource('dynamodb')

table = dynamodb.Table('iine_qiita_logs')

for item in items:
ids = item.get('id')
title = item.get('title')
iine = item.get('likes_count')

try:
response = table.update_item(
Key={
'ids': ids
},
UpdateExpression="set iine = :newiine, title = :title",
ConditionExpression="attribute_not_exists(ids) or iine <> :newiine",
ExpressionAttributeValues={
":newiine": iine,
":title": title
},
)
except ClientError as e:
if e.response['Error']['Code'] == "ConditionalCheckFailedException":
print(e.response['Error']['Message'])
else:
raise


def main(client, content):
"""this is handler function for Lambda"""
qiita_token: str = os.environ['QIITA_TOKEN']
url: str = os.environ['QIITA_URL']
per_page = int(os.environ['PER_PAGE'])

items: List[Dict[str, Any]] = get_items(qiita_token, per_page=per_page, url=url)
update_logs(items)
return {
'statusCode': 200
}
158 changes: 158 additions & 0 deletions qiita_notification/send_new_iine_dev.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import json
import os
from math import ceil
from typing import List, Dict, Any, Union, Tuple
import json
from urllib.request import Request
from urllib import request, parse, error
from http.client import HTTPResponse


class Response():
"""Http Response Object"""

def __init__(self, res: HTTPResponse):
self.body = self._json(res)
self.status_code = self._status_code(res)
self.headers = self._headers(res)

def _json(self, res: HTTPResponse):
return json.loads(res.read())

def _status_code(self, res: HTTPResponse) -> int:
return res.status

def _headers(self, res: HTTPResponse) -> Dict[str, str]:
return dict(res.getheaders())


def req_get(url: str, headers=None, params=None) -> Response:
"""get request. simplified request function of Requests
:return: Response object
"""
if params:
url = '{}?{}'.format(url, parse.urlencode(params))

req = Request(url, headers=headers, method='GET')

with request.urlopen(req) as res:
response = Response(res)
return response


def req_post(url: str, data: Dict[str, Any], headers=None, params=None) -> Response:
"""post request. simplified request function of Requests
:return: Response object
"""
if headers.get('Content-Type') == 'application/x-www-form-urlencoded':
encoded_data = parse.urlencode(data).encode()

else:
encoded_data = json.dumps(data).encode()

req = Request(url, data=encoded_data, headers=headers, method='POST')

with request.urlopen(req) as res:
response = Response(res)
return response


def serialize_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""serialize data of Dynamo DB Stream
:return:
"""
if record.get('eventName') != 'MODIFY':
return {}

past = record.get('dynamodb', {}).get('OldImage')
past_iine = int(past.get('iine', {}).get('N', 0))
ids = past.get('ids', {}).get('S', '')

new = record.get('dynamodb', {}).get('NewImage')
title = new.get('title', {}).get('S', '')
return {
'ids': ids,
'title': title,
'past_iine': past_iine
}


def serialize_response_name(response: Response, num: int, title: str) -> Dict[str, Any]:
"""serialize iine data of Qiita API v2
:param response:
:return:
"""
size = len(response.body) - num
if size <= 0:
users: List[str] = []

new_iine = response.body[:size]
users = [
resp.get('user', {}).get('id') for resp in new_iine
]
return {
'title': title,
'users': users
}


def get_new_iine(item: Dict[str, Any], token: str) -> Dict[str, Any]:
"""HTTP request to Qiita API v2
:params:
:return:
"""
headers = {'Authorization': 'Bearer {}'.format(token)}
ids = item.get('ids', '')
past_iine = item.get('past_iine', 0)
url = f'https://qiita.com/api/v2/items/{ids}/likes'

response = req_get(url, headers=headers)
title: str = item.get('title', '')
resp = serialize_response_name(response, past_iine, title)
return resp


def deserialize_response_name(response: Dict[str, Any], max_length=20) -> str:
"""deserialize text for LINE Notify
:param max_length: max sentence length
:return:
"""
names = ", ".join(response.get('users', []))
title = response.get('title', '')
title = f"{title}" if len(title) <= max_length else f"{title[:max_length]}..."
return f"\n{names}が「{title}」にいいねしました。"


def send_notification(message: str, token: str):
"""send notification by LINE notify"""
url = 'https://notify-api.line.me/api/notify'

headers = {
'Authorization': 'Bearer {}'.format(token),
'Content-Type': 'application/x-www-form-urlencoded'
}
msg = {
'message': message
}
response = req_post(url, data=msg, headers=headers)
return response.body


def lambda_handler(event, context):
"""main handler for Lambda"""
qiita_token = os.environ["QIITA_TOKEN"]
line_token = os.environ["LINE_TOKEN"]

records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
if not serialized_data:
continue
new_iines = get_new_iine(serialized_data, qiita_token)
if len(new_iines.get('users')) == 0:
continue
send_notification(deserialize_response_name(new_iines), line_token)

return {
'statusCode': 200,
}

0 comments on commit edf0ad3

Please sign in to comment.