1 # Google Cloud Bigtable
2
3 [Cloud Bigtable](https://cloud.google.com/bigtable/) is a high
4 performance storage system that can store and serve training data. This contrib
5 package contains an experimental integration with TensorFlow.
6
7 > **Status: Highly experimental.** The current implementation is very much in
8 > flux. Please use at your own risk! :-)
9
10 The TensorFlow integration with Cloud Bigtable is optimized for common
11 TensorFlow usage and workloads. It is currently optimized for reading from Cloud
12 Bigtable at high speed, in particular to feed modern accelerators. For
13 general-purpose Cloud Bigtable
14 APIs, see the [official Cloud Bigtable client library documentation][clientdoc].
15
16 [clientdoc]: https://cloud.google.com/bigtable/docs/reference/libraries
17
18 ## Sample Use
19
20 There are three main reading styles supported by the `BigtableTable` class:
21
22 1. **Reading keys**: Read only the row keys in a table. Keys are returned in
23 sorted order from the table. Most key reading operations retrieve all keys
24 in a contiguous range, however the `sample_keys` operation skips keys, and
25 operates on the whole table (and not a contiguous subset).
26 2. **Retrieving a row's values**: Given a row key, look up the data associated
27 with a defined set of columns. This operation takes advantage of Cloud
28 Bigtable's low-latency and excellent support for random access.
29 3. **Scanning ranges**: Given a contiguous range of rows retrieve both the row
30 key and the data associated with a fixed set of columns. This operation
31 takes advantage of Cloud Bigtable's high throughput scans, and is the most
32 efficient way to read data.
33
34 When using the Cloud Bigtable API, the workflow is:
35
36 1. Create a `BigtableClient` object.
37 2. Use the `BigtableClient` to create `BigtableTable` objects corresponding to
38 each table in the Cloud Bigtable instance you would like to access.
39 3. Call methods on the `BigtableTable` object to create `tf.data.Dataset`s to
40 retrieve data.
41
42 The following is an example for how to read all row keys with the prefix
43 `train-`.
44
45 ```python
46 import tensorflow as tf
47
48 GCP_PROJECT_ID = '<FILL_ME_IN>'
49 BIGTABLE_INSTANCE_ID = '<FILL_ME_IN>'
50 BIGTABLE_TABLE_NAME = '<FILL_ME_IN>'
51 PREFIX = 'train-'
52
53 def main():
54 tf.enable_eager_execution()
55
56 client = tf.contrib.cloud.BigtableClient(GCP_PROJECT_ID, BIGTABLE_INSTANCE_ID)
57 table = client.table(BIGTABLE_TABLE_NAME)
58 dataset = table.keys_by_prefix_dataset(PREFIX)
59
60 print('Retrieving rows:')
61 row_index = 0
62 for row_key in dataset:
63 print('Row key %d: %s' % (row_index, row_key))
64 row_index += 1
65 print('Finished reading data!')
66
67 if __name__ == '__main__':
68 main()
69
70 ```
71
72 ### Reading row keys
73
74 Read only the row keys in a table. Keys are returned in sorted order from the
75 table. Most key reading operations retrieve all keys in a contiguous range,
76 however the `sample_keys` operation skips keys, and operates on the whole table
77 (and not a contiguous subset).
78
79 There are 3 methods to retrieve row keys:
80
81 - `table.keys_by_range_dataset(start, end)`: Retrieve row keys starting with
82 `start`, and ending with `end`. The range is "half-open", and thus it
83 includes `start` if `start` is present in the table. It does not include
84 `end`.
85 - `table.keys_by_prefix_dataset(prefix)`: Retrieves all row keys that start
86 with `prefix`. It includes the row key `prefix` if present in the table.
87 - `table.sample_keys()`: Retrieves a sampling of keys from the underlying
88 table. This is often useful in conjunction with parallel scans.
89
90 ### Reading cell values given a row key
91
92 Given a dataset producing row keys, you can use the `table.lookup_columns`
93 transformation to retrieve values. Example:
94
95 ```python
96 key_dataset = tf.data.Dataset.from_tensor_slices([
97 'row_key_1',
98 'other_row_key',
99 'final_row_key',
100 ])
101 values_dataset = key_dataset.apply(
102 table.lookup_columns(('my_column_family', 'column_name'),
103 ('other_cf', 'col')))
104 training_data = values_dataset.map(my_parsing_function) # ...
105 ```
106
107 ### Scanning ranges
108 Given a contiguous range of rows retrieve both the row key and the data
109 associated with a fixed set of columns. Scanning is the most efficient way to
110 retrieve data from Cloud Bigtable and is thus a very common API for high
111 performance data pipelines. To construct a scanning `tf.data.Dataset` from a
112 `BigtableTable` object, call one of the following methods:
113
114 - `table.scan_prefix(prefix, ...)`
115 - `table.scan_range(start, end, ...)`
116 - `table.parallel_scan_prefix(prefix, ...)`
117 - `table.parallel_scan_range(start, end, ...)`
118
119 Aside from the specification of the contiguous range of rows, they all take the
120 following arguments:
121
122 - `probability`: (Optional.) A float between 0 (exclusive) and 1 (inclusive).
123 A non-1 value indicates to probabilistically sample rows with the
124 provided probability.
125 - `columns`: The columns to read. (See below.)
126 - `**kwargs`: The columns to read. (See below.)
127
128 In addition the two parallel operations accept the following optional argument:
129 `num_parallel_scans` which configures the number of parallel Cloud Bigtable scan
130 operations to run. A reasonable default is automatically chosen for small
131 Cloud Bigtable clusters. If you have a large cluster, or an extremely demanding
132 workload, you can tune this value to optimize performance.
133
134 #### Specifying columns to read when scanning
135
136 All of the scan operations allow you to specify the column family and columns
137 in the same ways.
138
139 ##### Using `columns`
140
141 The first way to specify the data to read is via the `columns` parameter. The
142 value should be a tuple (or list of tuples) of strings. The first string in the
143 tuple is the column family, and the second string in the tuple is the column
144 qualifier.
145
146 ##### Using `**kwargs`
147
148 The second way to specify the data to read is via the `**kwargs` parameter,
149 which you can use to specify keyword arguments corresponding to the columns that
150 you want to read. The keyword to use is the column family name, and the argument
151 value should be either a string, or a tuple of strings, specifying the column
152 qualifiers (column names).
153
154 Although using `**kwargs` has the advantage of requiring less typing, it is not
155 future-proof in all cases. (If we add a new parameter to the scan functions that
156 has the same name as your column family, your code will break.)
157
158 ##### Examples
159
160 Below are two equivalent snippets for how to specify which columns to read:
161
162 ```python
163 ds1 = table.scan_range("row_start", "row_end", columns=[("cfa", "c1"),
164 ("cfa", "c2"),
165 ("cfb", "c3")])
166 ds2 = table.scan_range("row_start", "row_end", cfa=["c1", "c2"], cfb="c3")
167 ```
168
169 In this example, we are reading 3 columns from a total of 2 column families.
170 From the `cfa` column family, we are reading columns `c1`, and `c2`. From the
171 second column family (`cfb`), we are reading `c3`. Both `ds1` and `ds2` will
172 output elements of the following types (`tf.string`, `tf.string`, `tf.string`,
173 `tf.string`). The first `tf.string` is the row key, the second `tf.string` is
174 the latest data in cell `cfa:c1`, the third corresponds to `cfa:c2`, and the
175 final one is `cfb:c3`.
176
177 #### Determinism when scanning
178
179 While the non-parallel scan operations are fully deterministic, the parallel
180 scan operations are not. If you would like to scan in parallel without losing
181 determinism, you can build up the `parallel_interleave` yourself. As an example,
182 say we wanted to scan all rows between `training_data_00000`, and
183 `training_data_90000`, we can use the following code snippet:
184
185 ```python
186 table = # ...
187 columns = [('cf1', 'col1'), ('cf1', 'col2')]
188 NUM_PARALLEL_READS = # ...
189 ds = tf.data.Dataset.range(9).shuffle(10)
190 def interleave_fn(index):
191 # Given a starting index, create 2 strings to be the start and end
192 start_idx = index
193 end_idx = index + 1
194 start_idx_str = tf.as_string(start_idx * 10000, width=5, fill='0')
195 end_idx_str = tf.as_string(end_idx * 10000, width=5, fill='0')
196 start = tf.string_join(['training_data_', start_idx_str])
197 end = tf.string_join(['training_data_', end_idx_str])
198 return table.scan_range(start_idx, end_idx, columns=columns)
199 ds = ds.apply(tf.data.experimental.parallel_interleave(
200 interleave_fn, cycle_length=NUM_PARALLEL_READS, prefetch_input_elements=1))
201 ```
202
203 > Note: you should divide up the key range into more sub-ranges for increased
204 > parallelism.
205
206 ## Writing to Cloud Bigtable
207
208 In order to simplify getting started, this package provides basic support for
209 writing data into Cloud Bigtable.
210
211 > Note: The implementation is not optimized for performance! Please consider
212 > using alternative frameworks such as Apache Beam / Cloud Dataflow for
213 > production workloads.
214
215 Below is an example for how to write a trivial dataset into Cloud Bigtable.
216
217 ```python
218 import tensorflow as tf
219
220 GCP_PROJECT_ID = '<FILL_ME_IN>'
221 BIGTABLE_INSTANCE_ID = '<FILL_ME_IN>'
222 BIGTABLE_TABLE_NAME = '<FILL_ME_IN>'
223 COLUMN_FAMILY = '<FILL_ME_IN>'
224 COLUMN_QUALIFIER = '<FILL_ME_IN>'
225
226 def make_dataset():
227 """Makes a dataset to write to Cloud Bigtable."""
228 return tf.data.Dataset.from_tensor_slices([
229 'training_data_1',
230 'training_data_2',
231 'training_data_3',
232 ])
233
234 def make_row_key_dataset():
235 """Makes a dataset of strings used for row keys.
236
237 The strings are of the form: `fake-data-` followed by a sequential counter.
238 For example, this dataset would contain the following elements:
239
240 - fake-data-00000001
241 - fake-data-00000002
242 - ...
243 - fake-data-23498103
244 """
245 counter_dataset = tf.data.experimental.Counter()
246 width = 8
247 row_key_prefix = 'fake-data-'
248 ds = counter_dataset.map(lambda index: tf.as_string(index,
249 width=width,
250 fill='0'))
251 ds = ds.map(lambda idx_str: tf.string_join([row_key_prefix, idx_str]))
252 return ds
253
254
255 def main():
256 client = tf.contrib.cloud.BigtableClient(GCP_PROJECT_ID, BIGTABLE_INSTANCE_ID)
257 table = client.table(BIGTABLE_TABLE_NAME)
258 dataset = make_dataset()
259 index_dataset = make_row_key_dataset()
260 aggregate_dataset = tf.data.Dataset.zip((index_dataset, dataset))
261 write_op = table.write(aggregate_dataset, column_families=[COLUMN_FAMILY],
262 columns=[COLUMN_QUALIFIER])
263
264 with tf.Session() as sess:
265 print('Starting transfer.')
266 sess.run(write_op)
267 print('Transfer complete.')
268
269 if __name__ == '__main__':
270 main()
271 ```
272
273 ## Sample applications and architectures
274
275 While most machine learning applications are well suited by a high performance
276 distributed file system, there are certain applications where using Cloud
277 Bigtable works extremely well.
278
279 ### Perfect Shuffling
280
281 Normally, training data is stored in flat files, and a combination of
282 (1) `tf.data.Dataset.interleave` (or `parallel_interleave`), (2)
283 `tf.data.Dataset.shuffle`, and (3) writing the data in an unsorted order in the
284 data files in the first place, provides enough randomization to ensure models
285 train efficiently. However, if you would like perfect shuffling, you can use
286 Cloud Bigtable's low-latency random access capabilities. Create a
287 `tf.data.Dataset` that generates the keys in a perfectly random order (or read
288 all the keys into memory and use a shuffle buffer sized to fit all of them for a
289 perfect random shuffle using `tf.data.Dataset.shuffle`), and then use
290 `lookup_columns` to retrieve the training data.
291
292 ### Distributed Reinforcement Learning
293
294 Sophisticated reinforcement learning algorithms are commonly trained across a
295 distributed cluster. (See [IMPALA by DeepMind][impala].) One part of the cluster
296 runs self-play, while the other part of the cluster learns a new version of the
297 model based on the training data generated by self-play. The new model version
298 is then distributed to the self-play half of the cluster, and new training data
299 is generated to continue the cycle.
300
301 In such a configuration, because there is value in training on the freshest
302 examples, a storage service like Cloud Bigtable can be used to store and
303 serve the generated training data. When using Cloud Bigtable, there is no need
304 to aggregate the examples into large batch files, but the examples can instead
305 be written as soon as they are generated, and then retrieved at high speed.
306
307 [impala]: https://arxiv.org/abs/1802.01561
308
309 ## Common Gotchas!
310
311 ### gRPC Certificates
312
313 If you encounter a log line that includes the following:
314
315 ```
316 "description":"Failed to load file", [...],
317 "filename":"/usr/share/grpc/roots.pem"
318 ```
319
320 you can solve it via either of the following approaches:
321
322 * copy the [gRPC `roots.pem` file][grpcPem] to
323 `/usr/share/grpc/roots.pem` on your local machine, which is the default
324 location where gRPC will look for this file
325 * export the environment variable `GRPC_DEFAULT_SSL_ROOTS_FILE_PATH` to point to
326 the full path of the gRPC `roots.pem` file on your file system if it's in a
327 different location
328
329 [grpcPem]: https://github.com/grpc/grpc/blob/master/etc/roots.pem
330
331 ### Permission denied errors
332
333 The TensorFlow Cloud Bigtable client will search for credentials to use in the
334 process's environment. It will use the first credentials it finds if multiple
335 are available.
336
337 - **Compute Engine**: When running on Compute Engine, the client will often use
338 the service account from the virtual machine's metadata service. Be sure to
339 authorize your Compute Engine VM to have access to the Cloud Bigtable service
340 when creating your VM, or [update the VM's scopes][update-vm-scopes] on a
341 running VM if you run into this issue.
342 - **Cloud TPU**: Your Cloud TPUs run with the designated Cloud TPU service
343 account dedicated to your GCP project. Ensure the service account has been
344 authorized via the Cloud Console to access your Cloud Bigtable instances.
345
346 [update-vm-scopes]: https://cloud.google.com/compute/docs/access/create-enable-service-accounts-for-instances#changeserviceaccountandscopes
347