|  | 
|  | 1 | +""" | 
|  | 2 | + | 
|  | 3 | +# https://github.com/jvroig/AWS-Experiments-Lambda-Multithreading/blob/main/Lambdas/mp_noQueue/mp_noQueue.py | 
|  | 4 | +# https://docs.python.org/3/library/multiprocessing.html | 
|  | 5 | +# https://ithelp.ithome.com.tw/articles/10242047#:~:text=%E7%B7%9A%E7%A8%8B%E5%8F%AF%E4%BB%A5%E6%83%B3%E5%83%8F%E6%88%90%E5%AD%98%E5%9C%A8,%E9%80%B2%E7%A8%8B%E6%AF%94%E5%96%BB%E7%82%BA%E4%B8%80%E5%80%8B%E5%B7%A5%E5%BB%A0%EF%BC%8C | 
|  | 6 | + | 
|  | 7 | +# https://stackoverflow.com/questions/38322574/python-multiprocessing-sharing-of-global-values | 
|  | 8 | + | 
|  | 9 | +""" | 
|  | 10 | + | 
|  | 11 | +#import multiprocessing | 
|  | 12 | +import json | 
|  | 13 | +import os | 
|  | 14 | +import boto3 | 
|  | 15 | +import datetime as dt | 
|  | 16 | +import concurrent.futures | 
|  | 17 | + | 
|  | 18 | +# counter = multiprocessing.Manager().Value('i', 0) | 
|  | 19 | +# lock = multiprocessing.Manager().Lock() | 
|  | 20 | + | 
|  | 21 | +BUCKET_NAME = "multiprocess-test" | 
|  | 22 | + | 
|  | 23 | + | 
|  | 24 | +def cur_time_formatter(format): | 
|  | 25 | + | 
|  | 26 | + return dt.datetime.utcnow().strftime(format) | 
|  | 27 | + | 
|  | 28 | + | 
|  | 29 | +def write_to_s3(s3_client, bucket_name, key_name, json_data): | 
|  | 30 | + | 
|  | 31 | + print(f"s3_client = {s3_client}, bucket_name = {bucket_name}, key_name = {key_name}, json_data = {json_data}") | 
|  | 32 | + try: | 
|  | 33 | + s3_client.put_object(Body=json.dumps(json_data), Bucket=bucket_name, Key=key_name) | 
|  | 34 | + except Exception as e: | 
|  | 35 | + print(f"s3 put object failed : {str(e)}") | 
|  | 36 | + | 
|  | 37 | + | 
|  | 38 | +def upload_to_s3(data_id): | 
|  | 39 | + | 
|  | 40 | + print (f"Process id = {os.getpid()}, input = {input}, data_id = {data_id}") | 
|  | 41 | + | 
|  | 42 | + # # global count, lock | 
|  | 43 | + # _s3_client = boto3.client("s3") | 
|  | 44 | + # #for chunk in range(100): | 
|  | 45 | + # # with lock: | 
|  | 46 | + # # counter.value +=1 | 
|  | 47 | + # print (f"Process id = {os.getpid()}, input = {input}, thread_name = {thread_name}, counter value = {counter.value}") | 
|  | 48 | + # cur_time, cur_date = cur_time_formatter('%Y-%m-%d-%H-%M-%S'), cur_time_formatter('%Y/%m/%d') | 
|  | 49 | + # key_name = f"dev1/{cur_date}/{cur_time}-{data_id}.json" | 
|  | 50 | + # resp_data = {"val": data_id} | 
|  | 51 | + # print (f"resp_data = {resp_data}") | 
|  | 52 | + # write_to_s3(_s3_client, BUCKET_NAME, key_name, resp_data) | 
|  | 53 | + | 
|  | 54 | +def lambda_handler(event, context): | 
|  | 55 | + | 
|  | 56 | + _s3_client = boto3.client("s3") | 
|  | 57 | + _list = [x for x in range(100)] | 
|  | 58 | + | 
|  | 59 | + # with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor: | 
|  | 60 | + # executor.map(upload_to_s3, _list) | 
|  | 61 | + | 
|  | 62 | + print(f"concurrent start") | 
|  | 63 | + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | 
|  | 64 | + executor.map(upload_to_s3, range(100)) | 
|  | 65 | + | 
|  | 66 | + print(f"concurrent end") | 
|  | 67 | + resp = { | 
|  | 68 | + 'statusCode': 200, | 
|  | 69 | + 'body': json.dumps('OK') | 
|  | 70 | + } | 
|  | 71 | + return resp | 
0 commit comments