Home / 

There are many articles available today that explain how to send data from Salesforce to Google BigQuery. What they often fail to explain, however, is how to send BigQuery data to Salesforce, and then retrieve the data back to log into a BigQuery table for further analysis. In this blog, we show how you can automate this process in a simple manner without using expensive third-party tools.

Source data is often sent in batches. When there is a massive amount of data, it is quite possible that there is a difference between the number of records sent from the source and the number of records received at the target. It is not viable to manually check if the number of records sent from BigQuery matches the row count in Salesforce. We can simplify this using a basic Python script and Apache Airflow.

Sending Data from BigQuery to Salesforce

To start the process, import the Salesforce class from the simple_salesforce module in your Python script. ‘Simple Salesforce’ is a REST API client built for Python.

  1. Connect to Salesforce
  2. Login to Salesforce or create a Salesforce connection with username, password, and security token details.

    from simple_salesforce import Salesforce 
    
    sf = Salesforce(username=’', password='', security_token='')

  3. Send Data from BigQuery to Salesforce 
    • Select corresponding fields related to the Salesforce object from the BigQuery table.
    • query = "SELECT field1, field2, field3 from project_id.dataset_id.table_name" 
      
      cursor.execute(query)   
      
      result_set = cursor.fetchall()

    • Change the result set to a data frame
    • df = pd.DataFrame(result_set, columns = ['AccountNumber__c','AccountStatus__c','Account_Id__c']) 

      The column names in the above example are related to the Salesforce Contact object. Similarly, populate multiple columns in this object as required. 

      Ensure that the order of columns queried from BigQuery and data frame is the same. 

    • Convert data frame to JSON 
    • JSON conversion is required since the Salesforce instance accepts the JSON format. 

      jsondf = contactdf.to_json(orient= 'records') 
      
      contact_json = json.loads(jsondf)

    • Load Contact JSON data to Salesforce 
    • x = sf.bulk.Contact.upsert(contact_json, 'Account_Id__c', batch_size=1000, use_serial=True) 

      There are other Salesforce objects like Address, Opportunity etc., for which data can be inserted in a similar fashion.

Retrieving Data from Salesforce 

Here, data is accessed using SOQL - a language that looks like SQL and is used to query data from Salesforce. 

First, collect the list of IDs that need to fetch data from Salesforce. Use a unique column in Salesforce for the purpose. 

contactdf_for_sf = contactdf['’] 

 IdList = [] 

 IdList = contactdf_for_sf.tolist()
 

Fetching Data with SOQL

sf.queryall is a method from Simple Salesforce to fetch data using SOQL. In the current scenario, the required columns are pulled from Salesforce record by record, converted to a data frame, and appended to the List. 

Appended_df = pd.DataFrame(columns=['Id', 'npe01__WorkEmail__c', 'AccountId']) 

for Id in IdList: 

 sf_contact_data = sf.query_all( 

  "SELECT Id, npe01__WorkEmail__c, AccountId FROM Contact  

   WHERE npe01__WorkEmail__c = '%s' " %Id) 

 df =pd.DataFrame(sf_contact_data['records']) 

 Appended_df = Appended_df.append(df) 


Loading Response Data from Salesforce to BigQuery 

The ‘List’, which is the response from Salesforce, is loaded into a temporary BigQuery table. This temporary table is then used as the source table to update the target BigQuery table. This update is done to check for data that is not received by Salesforce. 

Now, check for the records that do not have the response column updated in the BigQuery target table and send those records to Salesforce again for processing.

sfcontactdf.to_gbq(gbq_table_name,project_id,chunksize=None, 

  if_exists='replace',verbose=False)

Automating the Process Using Airflow

You can schedule this Python script to run automatically at a scheduled time using Apache Airflow. Follow the steps below to deploy this code and automate it. 

  • Import the necessary python libraries as a first step 
  • Create DAG (Directed Acyclic Graph) object and required variables - include “schedule_interval” variable in this step to automatically run the DAG 
  • Add your tasks along with the data transfer script discussed above in one of the tasks 
  • Define the dependencies, i.e., the order of execution of the tasks 
  • Put this code into a file, “sample.py” 
  • Put “sample.py” file in the dags/ folder of Airflow 
  • See your DAG in action! 

Conclusion

This simple script saves a lot of manual work, like cross-checking data between BigQuery and Salesforce. As you can see, a few simple lines of code coupled with an SOQL query gets the job done automatically without the need for expensive third-party solutions.