Web Scraping Automation with Google Cloud Services
1) Introduction
We often need to periodically run a certain code script automatically. In this article, we will look into the storing and analyzing processes on the Google Cloud Platform by obtaining updated data from a website every half hour.
Cloud Scheduler, Cloud Functions, Cloud Storage, and BigQuery services will be used on the Google Cloud Platform. The Python script that will perform the crawler operations will run on Cloud Functions. The resulting data will be kept on the Cloud Storage Bucket as a file and appended to the table in BigQuery. In Cloud Functions, the Cloud Scheduler service will be used to trigger the Python script every half hour. To trigger another function, we will use the Cloud Storage Trigger feature. Thus, we will see an example of triggering the function prepared in the Cloud Functions service in two different ways.
In order to use the services mentioned, the services mentioned above must be enabled in your Google Cloud Platform project.
Let’s briefly talk about the services we will use.
1.1. Cloud Scheduler: Cloud Scheduler is a cron-based service offered by Google Cloud Platform to automatically run certain tasks and functions at a certain time or in a certain period. It can work with many different services offered by HTTP, HTTPS, and Google Cloud Platform.
1.2. Cloud Functions: Cloud Functions is a serverless service offered by Google Cloud Platform that enables the execution of functions of small pieces of code scripts used by software developers. The created functions can be run automatically by triggering services such as Cloud Scheduler.
1.3. Cloud Storage: Cloud Storage is a storage service that ensures that user data is kept in the Google infrastructure and can be easily accessed.
1.4. BigQuery: BigQuery is a cloud data warehouse service offered by Google Cloud Platform, where large data sets can be stored and analyzed, fast and complex queries can be run, scaled, and integrated with different BI applications.
2) Creating a Bucket in Cloud Storage
As the first step, we must go to Cloud Storage via the “Navigation Menu” in our GCP project to create a bucket in the Cloud Storage service. Bucket creation is performed by clicking the “CREATE” button on the Bucket tab in Cloud Storage and filling in the required information.
In this project, I proceeded with the “Multi-region” selection in the region section. I left the “Storage class”, “control access” and “how to protect object data” selections as default. The bucket name must be unique. Therefore, you need to be careful when giving names.
3) Creating a Function in Cloud Functions
We want to deploy the web scraping code script, written and tested in Python programming language on the local machine, to the Cloud Functions service. This function pulls the exchange rate data from this address, converts it to CSV format, and saves it in Cloud Storage. It performs the uploading process into the created Bucket.
To carry out the deployment process to Cloud Function, we start the function creation process with the “Create Function” button on the Cloud Functions page.
On the screen that appears, we enter our information as follows. Since we plan to trigger with Cloud Scheduler, we choose HTTP as our trigger.
We open the screen with the “Runtime, build, connections, and security settings” button at the bottom for advanced settings. While we can update the machine values according to the data size and transaction volume we need, we can also update connection and security selections. We proceed without making any changes in this part.
When we proceed with the “Next” button, we move on to the “Source” section. In this part, we need to choose the programming language in which the code script will run, and we add our function code into the script per the Cloud Functions structure. We select Python 3.9 from the “Runtime” section. When the code is ready, we can provide a file path in the “source code” section and proceed. For this case, we manually add our code to the main.py file.
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import json
import time
from google.cloud import storage
import unicodedata
import pandas as pd
import os
def crawler(request):
date = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
print("Tarih", date)
r = requests.get("https://bigpara.hurriyet.com.tr/doviz/")
soup = BeautifulSoup(r.content, "html.parser")
currency_list_len = len(
soup.select('#content > div.contentLeft > div > div.tableCnt > div > div.tableBox.srbstPysDvz > '
'div.tBody > ul'))
exchange_rate_dict = []
for i in range(1, currency_list_len + 1):
rate_info = {'date': date}
main_search_string = f'#content > div.contentLeft > div > div.tableCnt > div > div.tableBox.srbstPysDvz > '
f'div.tBody ul:nth-child({i})'
data = soup.select_one(main_search_string)
currency_code = data.select_one('li.cell010.tal > h3 > a > img')['src'].replace('/Assets/images/mini_bayrak/',
'').replace('.png', '')
rate_info['currency_code'] = currency_code
currency = data.select_one('li.cell010.tal > h3 > a').text
rate_info['currency'] = currency
forex_buying = data.select_one('li:nth-child(3)').text.replace(".", "").replace(",", ".")
rate_info['forex_buying'] = forex_buying
forex_selling = data.select_one('li:nth-child(4)').text.replace(".", "").replace(",", ".")
rate_info['forex_selling'] = forex_selling
forex_percent = data.select_one('li:nth-child(5)').text.replace(".", "").replace(",", ".").replace("%", "")
rate_info['forex_percent'] = forex_percent
exchange_rate_dict.append(rate_info)
gold_index_list = [2, 3, 5, 6, 8]
gold_currency_mapping = {
'Gram Altın': 'GAU',
'Ons Altın': 'ONS'
}
r2 = requests.get("https://uzmanpara.milliyet.com.tr/altin-fiyatlari/")
soup = BeautifulSoup(r2.content, "html.parser")
for i in gold_index_list:
rate_info = {'date': date}
main_search_string = f'#altinfiyat > tbody > tr:nth-child({i})'
data = soup.select_one(main_search_string)
currency = data.select_one('td.currency > a').text
rate_info['currency'] = currency
currency_code = gold_currency_mapping.get(currency, currency)
rate_info['currency_code'] = currency_code
forex_buying = data.select_one('td:nth-child(3)').text.replace(".", "").replace(",", ".").replace(' TL', '')
rate_info['forex_buying'] = forex_buying
forex_selling = data.select_one('td:nth-child(4)').text.replace(".", "").replace(",", ".").replace(' TL', '')
rate_info['forex_selling'] = forex_selling
forex_percent = data.select_one('td:nth-child(5)').text.replace('% ', '').replace(".", "").replace(",", ".")
rate_info['forex_percent'] = forex_percent
exchange_rate_dict.append(rate_info)
silver_main_search_string = 'body > div:nth-child(12) > div.detMain.borsaMain.goldMain > div.detL > '
'div:nth-child(4) > table > tbody > tr:nth-child(2)'
data = soup.select_one(silver_main_search_string)
silver_buy = data.select_one('td:nth-child(3)').text.replace(".", "").replace(",", ".")
silver_sell = data.select_one('td:nth-child(4)').text.replace(".", "").replace(",", ".")
silver_exchange_rate = data.select_one('td:nth-child(5)').text.replace('% ', '').replace(".", "").replace(",", ".")
exchange_rate_dict.append({
'date': date,
'currency_code': 'XAG',
'currency': 'Gümüs',
'forex_buying': silver_buy,
'forex_selling': silver_sell,
'forex_percent': silver_exchange_rate
})
coin_index_list = [2, 3, 8]
gold_currency_mapping = {
'Gram Altın': 'GAU',
'Ons Altın': 'ONS'
}
r3 = requests.get("https://uzmanpara.milliyet.com.tr/kripto-paralar/")
soup = BeautifulSoup(r3.content, "html.parser")
for i in coin_index_list:
rate_info = {'date': date}
main_search_string = f'#myTable > tbody > tr:nth-child({i})'
data = soup.select_one(main_search_string)
raw_currency = data.select_one('td.currency > a').text.split(' ')
currency = raw_currency[0]
rate_info['currency'] = currency
currency_code = raw_currency[1].replace('(', '').replace(')', '')
rate_info['currency_code'] = currency_code
forex_buying = data.select_one('td:nth-child(2)').text.replace(".", "").replace(",", ".")
rate_info['forex_buying'] = forex_buying
forex_selling = data.select_one('td:nth-child(3)').text.replace(".", "").replace(",", ".")
rate_info['forex_selling'] = forex_selling
forex_percent = data.select_one('td:nth-child(4)').text.replace('% ', '').replace(".", "").replace(",", ".")
rate_info['forex_percent'] = forex_percent
exchange_rate_dict.append(rate_info)
time.sleep(2)
bucket_name = "test-bucket"
dfItem = pd.DataFrame.from_records(exchange_rate_dict)
time.sleep(2)
storage_client = storage.Client(project='kartaca-test')
bucket = storage_client.get_bucket(bucket_name)
bucket.blob('test-berke/forex_rate.csv').upload_from_string(
dfItem.to_csv(sep=',', encoding='utf-8', header=False, index=False), 'text/csv')
time.sleep(2)
status = "Basarili"
return status
The above function code performs data extraction and file uploading to the Storage Bucket in CSV format. Afterward, the Python libraries must be added to the “requirements.txt” file.
After testing the function with “Test Function” when we get successful results, we create the function with the “Deploy” button.
After the function is created, the function we created will be listed on the “Cloud Functions” page.
4) Creating a Job in Cloud Scheduler
After creating the function in Cloud Function, it will be necessary to create a Cloud Scheduler job to trigger the function. To do this, we switch to the Cloud Scheduler page and proceed with the “Create Job” button.
After defining a name for the Scheduler Job we will create, we write “30 * * * *” in the frequency section because we want to have an operation done every half hour. (You can use Crontab. ) As for Region, we select the function we created in the previous step similarly since it is in the “us-central1” region. You can choose the Timezone section based on your location.
In our selection of Execution config parameters, we select the “Target Type” parameter as “HTTP”. After this selection, other parameter selections will open. We paste the “Trigger URL” information from the Cloud Function we created into the URL section. We can access this information by going to the Cloud Function we created in a separate tab and clicking on the “Trigger” tab.
We choose our HTTP Method as “Post”. We select the “Auth header” parameter as “Add OAuth token” and after selection, we encounter two different parameters. While leaving the service account parameter as the default compute engine service account, we enter the trigger URL value obtained through the Cloud Function in the “Audience” section.
After completing the information in this section, we can create a Scheduler Job by clicking “Create”.
After the Scheduler job is created, the job will be listed in the Cloud Scheduler console. If you want to test your work, we can run the job by saying “Force run” in the “Actions” section on the right and wait for the Function to run. You can also review the logs by being directed to the Cloud Logging page for the job running with “View logs”.
5) BigQuery
After the data file obtained from Cloud Functions is written to Cloud Storage, this file will need to be added to the relevant table in BigQuery, and analysis will be carried out with SQL queries if necessary. To do this, a dataset and a table suitable for the data schema must be created in BigQuery.
For this, as the first step, a dataset will be created within the project on BigQuery.
In the image above, you can create a dataset by selecting the dataset ID and region with the “Create dataset” button in the action section where your project name is located.
Afterwards, it will be necessary to go to the created dataset and proceed in the same way with the “Create table” button in the action section.
- While creating the table, we select “Google Cloud Storage” in the “Create table from” option and select the uploaded data file.
- In the “Destination” section, the “Project” and “Dataset” sections are already ready since we come from the dataset. We enter the name of our table in the “Table” section.
- In the “Table Type” section, we select “Native Table”. Native tables are tables stored and managed in BigQuery. External tables connect from external data sources to sources other than BigQuery, i.e., data stored in Bigtable and Cloud Storage. It is not managed by BigQuery; the data source is kept as a reference and queried.
- In the Schema section, the schema must be determined according to the data type of the columns in our data source. An appropriate type should be selected for columns where the data type may change.
- In the “Partition and cluster settings” section, we select “Partition by ingestion time” in the “Partitioning” section. By selecting “By day” in the “Partitioning type” section, we are partitioning our table according to “day” information.
- In the “Partitioning filter” section, there is a “Require WHERE clause to query data” selection. If you want to require a “WHERE” condition based on the “day” information when you query this table, it will need to be “Enabled”. This option will be important in tables with very large data sizes.
- After these operations, a table can be created under the dataset by clicking “Create Table”. It can be seen that the data from the data file we selected on Cloud Storage has been added to it.
After completing the necessary actions on BigQuery, we need to automate this process. In other words, after the data is written to Cloud Storage, we want the data to be added to the table we create in BigQuery. We will use the Cloud Functions service for this.
6) Data Transfer to BigQuery with Cloud Functions
The Cloud Functions we create at this stage will read the data file obtained on Cloud Storage and insert data into the table under the relevant dataset in BigQuery. However, this function must work after data is uploaded to Cloud Storage. For this reason, we adjust the created Cloud Functions to be triggered when data is added.
We return to the Cloud Functions page and proceed with the “Create Function” button.
- We make our “Environment”, “Function name” and “Region” selections as in the previous steps.
- In the “Trigger” section, we click on the “Add Trigger” button at the bottom and select “Cloud Storage Trigger” from the options that open.
- After the selections, click on the “Next” button and move on to the “Code” section. Here, the Python code script that reads the file from Cloud Storage and inserts the data into the relevant table in BigQuery needs to be added as a function. We select the entry point section as “storage_to_bq”. Required Python libraries should be added to Requirements.txt.
- We can deploy the function after testing with “Test Function”.
- The created function will be listed on the Cloud Functions page.
- Function logs can also be tracked with the Cloud Logging service.
After this selection, we select our file path in the Bucket in the “Eventarc trigger section”. Cloud Storage and BigQuery authorizations must be granted via IAM for our chosen service account. If a Service Account that does not have these permissions is selected, the function will receive an error.
In “Main.py”, we name the function as “storage_to_bq.” This should be the same as the “Entry point”. Afterwards, we write our Python code into the function.
import json
from google.cloud import storage
from google.cloud import bigquery
import os
import time
def stroage_to_bq(data, context) :
project_id = 'kartaca-test'
dataset_id = 'berke_test_exchange'
table_id = 'kartaca-test.berke_test_exchange.forex_rate'
client = bigquery.Client(project='kartaca-test')
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("date", "DATETIME"),
bigquery.SchemaField("currency_code", "STRING"),
bigquery.SchemaField("currency", "STRING"),
bigquery.SchemaField("forex_buying", "FLOAT"),
bigquery.SchemaField("forex_selling", "FLOAT"),
bigquery.SchemaField("forex_percent", "FLOAT"),
],
source_format=bigquery.SourceFormat.CSV,
write_disposition = 'WRITE_APPEND',
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="date", # Name of the column to use for partitioning.
expiration_ms=7776000000, # 90 days.
),
)
uri = "gs://test-bucket/test-berke/forex_rate.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
load_job.result()
time.sleep(2)
table = client.get_table(table_id)
view_id = "kartaca-test.berke_test_exchange.dollar_forex_rate_view"
view = client.get_table(view_id)
job_config_dolar = bigquery.QueryJobConfig(destination="kartaca-test.berke_test_exchange.dollar_forex_rate")
job_config_dolar.write_disposition = 'WRITE_TRUNCATE'
sql = view.view_query
query_job = client.query(sql, job_config=job_config_dolar)
query_job.result()
return project_id
With these operations, our flow is completed. In summary, we can get the information we want from the web address we visit with the Python function and run this function with the Google Cloud Functions service. We use the Cloud Scheduler service to trigger and run this function periodically via cron. The resulting data file is uploaded to the bucket created in Cloud Storage. After this, we need the BigQuery environment to view and analyze the data. Here, another Cloud Function is created, and the Python script reads the data written to Cloud Storage and inserts the data into the relevant table in BigQuery. Cloud Storage Trigger was selected to run this function when data is loaded to Cloud Storage. This way, we can collect the most up-to-date data from the website during the day in our BigQuery table and perform analyses.
After the data is obtained in BigQuery, we can visualize it with different graphics by integrating our table into a visualization tool. We have deep dived into data visualization processes with Looker here.
Author: Berke Azal
Date Published: Nov 30, 2023
