Big Data, Cloud

Stream Twitter Data ke S3 dengan Amazon Kinesis

Pada artikel sebelumnya kita telah belajar menggunakan Twitter API dengan Python. Selanjutnya stream data dari Twitter API tadi akan diteruskan ke Amazon S3 melalui Amazon Kinesis

Amazon S3 adalah layanan object storage yang menawarkan skalabilitas, ketersediaan data dan keamanan yang tinggi.

Amazon S3 biasanya digunakan sebagai data lake yang menyimpan berbagai macam data baik berupa data terstruktur maupun tidak terstruktur

Note: Untuk lebih mengenal Amazon S3 bisa dibaca artikel tutorial dasar Amazon S3

Amazon Kinesis adalah layanan yang terkelola (managed service), dapat diskalakan yang digunakan untuk pemrosesan stream data dengan jumlah besar per detik

Skenario Arsitektur Data

Stream Twitter Data ke S3 dengan Amazon Kinesis

Tweet dari Twitter API akan dibaca di EC2 instance dengan menggunakan Library Tweepy seperti pada artikel sebelumnya

Data yang dikumpulkan akan dilakukan preprocessing untuk hanya diambil beberapa atribut seperti name, screen name, tweet text, follower count, location, geo dan created_at

Data hasil preprocessing akan distream ke S3 melalui Amazon Kinesis dengan menggunkan fungsi put_record dengan parameter nama delivery_stream dan data tweet yang sudah bersih

Data akan disimpan ke dalam Amazon S3 dan dilakukan partisi berdasarkan waktu. Nantinya data di S3 dapat diproses kembali untuk dianalisis misalnya dengan menggunakan Amazon Athena atau Amazon EMR

Membuat Kinesis Data Delivery Stream

Langkah pertama adalah membuat Kinesis data delivery stream melalui AWS Console dan pilih layanan Kinesis

Pada Data Firehouse, pilih create delivery system

Stream Twitter Data ke S3 dengan Amazon Kinesis

Step 1, Input nama delivery stream dan Next

Step 2, biarkan default dan Next

Step 3, Pilih Amazon S3 sebagai destinasi, pilih bucket dan Next

Stream Twitter Data ke S3 dengan Amazon Kinesis

Step 4, biarkan default dan Next

Step 5, review konfigurasi yang telah kita buat dan pilih create delivery stream

Stream Twitter data ke Amazon Kinesis

Setelah menyelesaikan konfigurasi Amazon Kinesis, selanjutnya buatlah script Python seperti pada artikel sebelumnya untuk membaca stream data dan diteruskan ke Amazon Kinesis

Pertama, pada fungsi on_data() hasil stream harus dipreproces terlebih dahulu

Tweet yang distream selanjutnya diconvert dalam bentuk json dan diambil beberapa atribut seperti yang sudah kita definisikan diatas seperti nama, screen name, tweet text dll

Gunakan fungi put_record() untuk meneruskan data yang sudah terstruktur ke Amazon Kinesis dengan parameter nama delivery system dan data yang sudah bersih

Untuk mengakses layanan Amazon Kinesis kita harus sudah menginstall library boto3 untuk authentikasi

Note: Untuk lebih jelas mengenai boto3 silahkan baca artikel tutorial dasar boto3 untuk Amazon S3

Berikut script lengkap untuk stream data ke Amazon Kinesis

from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
import boto3
import time
import tweepy

# Step 1 - Authenticate
consumer_key= ''
consumer_secret= ''

access_token=''
access_token_secret=''

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth)

#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):
    
    def on_data(self, data):
        # print(data)
        tweet = json.loads(data)
        try:
            if 'extended_tweet' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['extended_tweet']['full_text'],
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       '\n'
                       ]
                message = '\t'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
            elif 'text' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['text'].replace('\n',' ').replace('\r',' '),
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       '\n'
                       ]
                message = '\t'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
        except (AttributeError, Exception) as e:
                print (e)
        return True

    def on_error(self, status):
        print(status)

if __name__ == '__main__':

    #This handles Twitter authetification and the connection to Twitter Streaming API
    l = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    # boto3
    client = boto3.client('firehose', 
                          region_name='us-east-1',
                          aws_access_key_id='',
                          aws_secret_access_key='',
                          aws_session_token=''
                          )

    delivery_stream = 'twitter-data-stream'

    stream = Stream(auth, l)

    #This line filter tweets from the words.
    stream.filter(track=['indonesia', 'gedung sate'])

Referensi kode dapat dilihat di github.com/walkyrie67

Amazon Kinesis akan menyimpan data ke dalam data lake di Amazon S3 yang dipartisi berdasarkan waktu (year/month/day)

Stream Twitter Data ke S3 dengan Amazon Kinesis

Leave a Reply

Your email address will not be published. Required fields are marked *