Home | History | Annotate | Download | only in app_engine
      1 
      2 # Copyright 2016 Google Inc.
      3 #
      4 # Licensed under the Apache License, Version 2.0 (the "License");
      5 # you may not use this file except in compliance with the License.
      6 # You may obtain a copy of the License at
      7 #
      8 #     http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 # Unless required by applicable law or agreed to in writing, software
     11 # distributed under the License is distributed on an "AS IS" BASIS,
     12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 # See the License for the specific language governing permissions and
     14 # limitations under the License.
     15 
     16 import logging
     17 from google import protobuf
     18 from gcloud import bigtable
     19 
     20 _COLUMN_FAMILY_ID = 'cf1'
     21 
     22 
     23 class BigTableClient(object):
     24     """Defines the big table client that connects to the big table.
     25 
     26     Attributes:
     27         _column_family_id: A String for family of columns.
     28         _client: An instance of Client which is project specific.
     29         _client_instance: Representation of a Google Cloud Bigtable Instance.
     30         _start_index: Start index for the row key. It gets incremented as we
     31             dequeue.
     32         _end_index : End index for row key. This is incremented as we Enqueue.
     33         _table_name: A string that represents the big table.
     34         _table_instance: An instance of the Table that represents the big table.
     35     """
     36 
     37     def __init__(self, table, project_id):
     38         self._column_family_id = _COLUMN_FAMILY_ID
     39         self._client = bigtable.Client(project=project_id, admin=True)
     40         self._client_instance = None
     41         self._start_index = 0
     42         self._end_index = 0
     43         self._table_name = table
     44         self._table_instance = None
     45         # Start client to enable receiving requests
     46         self.StartClient()
     47 
     48     def StartClient(self, instance_id):
     49         """Starts client to prepare it to make requests."""
     50 
     51         # Start the client
     52         if not self._client.is_started():
     53             self._client.start()
     54         self._client_instance = self._client.instance(instance_id)
     55         if self._table_instance is None:
     56             self._table_instance = self._client_instance.table(self._table_name)
     57 
     58     def StopClient(self):
     59         """Stop client to close all the open gRPC clients."""
     60 
     61         # stop client
     62         self._client.stop()
     63 
     64     def CreateTable(self):
     65         """Creates a table in which read/write operations are performed.
     66 
     67         Raises:
     68             AbortionError: Error occurred when creating table is not successful.
     69                 This could be due to creating a table with a duplicate name.
     70         """
     71 
     72         # Create a table
     73         logging.debug('Creating the table %s', self._table_name)
     74 
     75         self._table_instance.create()
     76         cf1 = self._table_instance.column_family(self._column_family_id)
     77         cf1.create()
     78 
     79     def Enqueue(self, messages, column_id):
     80         """Writes new rows to the given table.
     81 
     82         Args:
     83             messages: An array of strings that represents the message to be
     84                 written to a new row in the table. Each message is writte to a
     85                 new row
     86             column_id: A string that represents the name of the column to which
     87                 data is to be written.
     88         """
     89 
     90         # Start writing rows
     91         logging.debug('Writing to the table : %s, column : %s', self._table_name,
     92                       column_id)
     93         for value in messages:
     94             row_key = str(self._end_index)
     95             self._end_index = self._end_index + 1
     96             row = self._table_instance.row(row_key)
     97             row.set_cell(self._column_family_id, column_id.encode('utf-8'),
     98                          value.encode('utf-8'))
     99             row.commit()
    100         # End writing rows
    101 
    102     def Dequeue(self):
    103         """Removes and returns the first row from the table.
    104 
    105         Returns:
    106             row: A row object that represents the top most row.
    107         """
    108 
    109         if self._end_index < self._start_index:
    110             return
    111 
    112         logging.info('Getting a single row by row key.')
    113         key = str(self._start_index)
    114         row_cond = self._table_instance.row(key)
    115         top_row = row_cond
    116         row_cond.delete()
    117         self._start_index = self._start_index + 1
    118 
    119         return top_row
    120 
    121     def DeleteTable(self):
    122         """Performs delete operation for a given table."""
    123 
    124         # Delete the table
    125         logging.debug('Deleting the table : %s', self._table_name)
    126         self._table_instance.delete()
    127