Pada artikel sebelumnya kita telah belajar menggunakan Twitter API dengan Python. Selanjutnya stream data dari Twitter API tadi akan diteruskan ke Amazon S3 melalui Amazon Kinesis
Amazon S3 adalah layanan object storage yang menawarkan skalabilitas, ketersediaan data dan keamanan yang tinggi.
Amazon S3 biasanya digunakan sebagai data lake yang menyimpan berbagai macam data baik berupa data terstruktur maupun tidak terstruktur
Note: Untuk lebih mengenal Amazon S3 bisa dibaca artikel tutorial dasar Amazon S3
Amazon Kinesis adalah layanan yang terkelola (managed service), dapat diskalakan yang digunakan untuk pemrosesan stream data dengan jumlah besar per detik
Skenario Arsitektur Data
Tweet dari Twitter API akan dibaca di EC2 instance dengan menggunakan Library Tweepy seperti pada artikel sebelumnya
Data yang dikumpulkan akan dilakukan preprocessing untuk hanya diambil beberapa atribut seperti name, screen name, tweet text, follower count, location, geo dan created_at
Data hasil preprocessing akan distream ke S3 melalui Amazon Kinesis dengan menggunkan fungsi put_record dengan parameter nama delivery_stream dan data tweet yang sudah bersih
Data akan disimpan ke dalam Amazon S3 dan dilakukan partisi berdasarkan waktu. Nantinya data di S3 dapat diproses kembali untuk dianalisis misalnya dengan menggunakan Amazon Athena atau Amazon EMR
Membuat Kinesis Data Delivery Stream
Langkah pertama adalah membuat Kinesis data delivery stream melalui AWS Console dan pilih layanan Kinesis
Pada Data Firehouse, pilih create delivery system
Step 1, Input nama delivery stream dan Next
Step 2, biarkan default dan Next
Step 3, Pilih Amazon S3 sebagai destinasi, pilih bucket dan Next
Step 4, biarkan default dan Next
Step 5, review konfigurasi yang telah kita buat dan pilih create delivery stream
Stream Twitter data ke Amazon Kinesis
Setelah menyelesaikan konfigurasi Amazon Kinesis, selanjutnya buatlah script Python seperti pada artikel sebelumnya untuk membaca stream data dan diteruskan ke Amazon Kinesis
Pertama, pada fungsi on_data() hasil stream harus dipreproces terlebih dahulu
Tweet yang distream selanjutnya diconvert dalam bentuk json dan diambil beberapa atribut seperti yang sudah kita definisikan diatas seperti nama, screen name, tweet text dll
Gunakan fungi put_record() untuk meneruskan data yang sudah terstruktur ke Amazon Kinesis dengan parameter nama delivery system dan data yang sudah bersih
Untuk mengakses layanan Amazon Kinesis kita harus sudah menginstall library boto3 untuk authentikasi
Note: Untuk lebih jelas mengenai boto3 silahkan baca artikel tutorial dasar boto3 untuk Amazon S3
Berikut script lengkap untuk stream data ke Amazon Kinesis
from tweepy.streaming import StreamListener from tweepy import OAuthHandler from tweepy import Stream import json import boto3 import time import tweepy # Step 1 - Authenticate consumer_key= '' consumer_secret= '' access_token='' access_token_secret='' auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) #This is a basic listener that just prints received tweets to stdout. class StdOutListener(StreamListener): def on_data(self, data): # print(data) tweet = json.loads(data) try: if 'extended_tweet' in tweet.keys(): #print (tweet['text']) message_lst = [str(tweet['id']), str(tweet['user']['name']), str(tweet['user']['screen_name']), tweet['extended_tweet']['full_text'], str(tweet['user']['followers_count']), str(tweet['user']['location']), str(tweet['geo']), str(tweet['created_at']), '\n' ] message = '\t'.join(message_lst) print(message) client.put_record( DeliveryStreamName=delivery_stream, Record={ 'Data': message } ) elif 'text' in tweet.keys(): #print (tweet['text']) message_lst = [str(tweet['id']), str(tweet['user']['name']), str(tweet['user']['screen_name']), tweet['text'].replace('\n',' ').replace('\r',' '), str(tweet['user']['followers_count']), str(tweet['user']['location']), str(tweet['geo']), str(tweet['created_at']), '\n' ] message = '\t'.join(message_lst) print(message) client.put_record( DeliveryStreamName=delivery_stream, Record={ 'Data': message } ) except (AttributeError, Exception) as e: print (e) return True def on_error(self, status): print(status) if __name__ == '__main__': #This handles Twitter authetification and the connection to Twitter Streaming API l = StdOutListener() auth = OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) # boto3 client = boto3.client('firehose', region_name='us-east-1', aws_access_key_id='', aws_secret_access_key='', aws_session_token='' ) delivery_stream = 'twitter-data-stream' stream = Stream(auth, l) #This line filter tweets from the words. stream.filter(track=['indonesia', 'gedung sate'])
Referensi kode dapat dilihat di github.com/walkyrie67
Amazon Kinesis akan menyimpan data ke dalam data lake di Amazon S3 yang dipartisi berdasarkan waktu (year/month/day)