PHOENIX-4625 memory leak in PhoenixConnection if scanner renew lease thread is not...
[phoenix.git] / phoenix-core / src / main / java / org / apache / phoenix / jdbc / PhoenixConnection.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.phoenix.jdbc;
19
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static java.util.Collections.emptyMap;
22 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
23 import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER;
24
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.io.PrintStream;
28 import java.io.Reader;
29 import java.lang.ref.WeakReference;
30 import java.sql.Array;
31 import java.sql.Blob;
32 import java.sql.CallableStatement;
33 import java.sql.Clob;
34 import java.sql.Connection;
35 import java.sql.DatabaseMetaData;
36 import java.sql.NClob;
37 import java.sql.ParameterMetaData;
38 import java.sql.PreparedStatement;
39 import java.sql.ResultSet;
40 import java.sql.ResultSetMetaData;
41 import java.sql.SQLClientInfoException;
42 import java.sql.SQLException;
43 import java.sql.SQLFeatureNotSupportedException;
44 import java.sql.SQLWarning;
45 import java.sql.SQLXML;
46 import java.sql.Savepoint;
47 import java.sql.Statement;
48 import java.sql.Struct;
49 import java.text.Format;
50 import java.util.ArrayList;
51 import java.util.Collections;
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Properties;
56 import java.util.concurrent.Executor;
57 import java.util.concurrent.LinkedBlockingQueue;
58
59 import javax.annotation.Nonnull;
60 import javax.annotation.Nullable;
61
62 import org.apache.hadoop.hbase.HConstants;
63 import org.apache.hadoop.hbase.client.Consistency;
64 import org.apache.htrace.Sampler;
65 import org.apache.htrace.TraceScope;
66 import org.apache.phoenix.call.CallRunner;
67 import org.apache.phoenix.exception.SQLExceptionCode;
68 import org.apache.phoenix.exception.SQLExceptionInfo;
69 import org.apache.phoenix.execute.CommitException;
70 import org.apache.phoenix.execute.MutationState;
71 import org.apache.phoenix.expression.function.FunctionArgumentType;
72 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
73 import org.apache.phoenix.iterate.DefaultTableResultIteratorFactory;
74 import org.apache.phoenix.iterate.ParallelIteratorFactory;
75 import org.apache.phoenix.iterate.TableResultIterator;
76 import org.apache.phoenix.iterate.TableResultIteratorFactory;
77 import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
78 import org.apache.phoenix.monitoring.MetricType;
79 import org.apache.phoenix.parse.PFunction;
80 import org.apache.phoenix.parse.PSchema;
81 import org.apache.phoenix.query.ConnectionQueryServices;
82 import org.apache.phoenix.query.ConnectionQueryServices.Feature;
83 import org.apache.phoenix.query.DelegateConnectionQueryServices;
84 import org.apache.phoenix.query.MetaDataMutated;
85 import org.apache.phoenix.query.PropertyPolicyProvider;
86 import org.apache.phoenix.query.QueryConstants;
87 import org.apache.phoenix.query.QueryServices;
88 import org.apache.phoenix.query.QueryServicesOptions;
89 import org.apache.phoenix.schema.PColumn;
90 import org.apache.phoenix.schema.PMetaData;
91 import org.apache.phoenix.schema.PMetaData.Pruner;
92 import org.apache.phoenix.schema.PName;
93 import org.apache.phoenix.schema.PTable;
94 import org.apache.phoenix.schema.PTableKey;
95 import org.apache.phoenix.schema.PTableRef;
96 import org.apache.phoenix.schema.PTableType;
97 import org.apache.phoenix.schema.SchemaNotFoundException;
98 import org.apache.phoenix.schema.TableNotFoundException;
99 import org.apache.phoenix.schema.types.PArrayDataType;
100 import org.apache.phoenix.schema.types.PDataType;
101 import org.apache.phoenix.schema.types.PDate;
102 import org.apache.phoenix.schema.types.PDecimal;
103 import org.apache.phoenix.schema.types.PTime;
104 import org.apache.phoenix.schema.types.PTimestamp;
105 import org.apache.phoenix.schema.types.PUnsignedDate;
106 import org.apache.phoenix.schema.types.PUnsignedTime;
107 import org.apache.phoenix.schema.types.PUnsignedTimestamp;
108 import org.apache.phoenix.schema.types.PVarbinary;
109 import org.apache.phoenix.trace.util.Tracing;
110 import org.apache.phoenix.transaction.PhoenixTransactionContext;
111 import org.apache.phoenix.util.DateUtil;
112 import org.apache.phoenix.util.JDBCUtil;
113 import org.apache.phoenix.util.NumberUtil;
114 import org.apache.phoenix.util.PhoenixRuntime;
115 import org.apache.phoenix.util.PropertiesUtil;
116 import org.apache.phoenix.util.ReadOnlyProps;
117 import org.apache.phoenix.util.SQLCloseable;
118 import org.apache.phoenix.util.SQLCloseables;
119 import org.apache.phoenix.util.SchemaUtil;
120 import org.apache.phoenix.util.VarBinaryFormatter;
121
122 import com.google.common.annotations.VisibleForTesting;
123 import com.google.common.base.Objects;
124 import com.google.common.base.Strings;
125 import com.google.common.collect.ImmutableMap;
126 import com.google.common.collect.ImmutableMap.Builder;
127 import com.google.common.collect.Lists;
128
129 /**
130 *
131 * JDBC Connection implementation of Phoenix. Currently the following are
132 * supported: - Statement - PreparedStatement The connection may only be used
133 * with the following options: - ResultSet.TYPE_FORWARD_ONLY -
134 * Connection.TRANSACTION_READ_COMMITTED
135 *
136 *
137 * @since 0.1
138 */
139 public class PhoenixConnection implements Connection, MetaDataMutated, SQLCloseable {
140 private final String url;
141 private String schema;
142 private final ConnectionQueryServices services;
143 private final Properties info;
144 private final Map<PDataType<?>, Format> formatters = new HashMap<>();
145 private final int mutateBatchSize;
146 private final long mutateBatchSizeBytes;
147 private final Long scn;
148 private final boolean buildingIndex;
149 private MutationState mutationState;
150 private List<PhoenixStatement> statements = new ArrayList<>();
151 private boolean isAutoFlush = false;
152 private boolean isAutoCommit = false;
153 private PMetaData metaData;
154 private final PName tenantId;
155 private final String datePattern;
156 private final String timePattern;
157 private final String timestampPattern;
158 private int statementExecutionCounter;
159 private TraceScope traceScope = null;
160 private volatile boolean isClosed = false;
161 private Sampler<?> sampler;
162 private boolean readOnly = false;
163 private Consistency consistency = Consistency.STRONG;
164 private Map<String, String> customTracingAnnotations = emptyMap();
165 private final boolean isRequestLevelMetricsEnabled;
166 private final boolean isDescVarLengthRowKeyUpgrade;
167 private ParallelIteratorFactory parallelIteratorFactory;
168 private final LinkedBlockingQueue<WeakReference<TableResultIterator>> scannerQueue;
169 private TableResultIteratorFactory tableResultIteratorFactory;
170 private boolean isRunningUpgrade;
171
172 static {
173 Tracing.addTraceMetricsSource();
174 }
175
176 private static Properties newPropsWithSCN(long scn, Properties props) {
177 props = new Properties(props);
178 props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
179 return props;
180 }
181
182 public PhoenixConnection(PhoenixConnection connection,
183 boolean isDescRowKeyOrderUpgrade, boolean isRunningUpgrade)
184 throws SQLException {
185 this(connection.getQueryServices(), connection.getURL(), connection
186 .getClientInfo(), connection.metaData, connection
187 .getMutationState(), isDescRowKeyOrderUpgrade,
188 isRunningUpgrade, connection.buildingIndex);
189 this.isAutoCommit = connection.isAutoCommit;
190 this.isAutoFlush = connection.isAutoFlush;
191 this.sampler = connection.sampler;
192 this.statementExecutionCounter = connection.statementExecutionCounter;
193 }
194
195 public PhoenixConnection(PhoenixConnection connection) throws SQLException {
196 this(connection, connection.isDescVarLengthRowKeyUpgrade(), connection
197 .isRunningUpgrade());
198 }
199
200 public PhoenixConnection(PhoenixConnection connection,
201 MutationState mutationState) throws SQLException {
202 this(connection.getQueryServices(), connection.getURL(), connection
203 .getClientInfo(), connection.getMetaDataCache(), mutationState,
204 connection.isDescVarLengthRowKeyUpgrade(), connection
205 .isRunningUpgrade(), connection.buildingIndex);
206 }
207
208 public PhoenixConnection(PhoenixConnection connection, long scn)
209 throws SQLException {
210 this(connection.getQueryServices(), connection, scn);
211 }
212
213 public PhoenixConnection(ConnectionQueryServices services,
214 PhoenixConnection connection, long scn) throws SQLException {
215 this(services, connection.getURL(), newPropsWithSCN(scn,
216 connection.getClientInfo()), connection.metaData, connection
217 .getMutationState(), connection.isDescVarLengthRowKeyUpgrade(),
218 connection.isRunningUpgrade(), connection.buildingIndex);
219 this.isAutoCommit = connection.isAutoCommit;
220 this.isAutoFlush = connection.isAutoFlush;
221 this.sampler = connection.sampler;
222 this.statementExecutionCounter = connection.statementExecutionCounter;
223 }
224
225 public PhoenixConnection(ConnectionQueryServices services, String url,
226 Properties info, PMetaData metaData) throws SQLException {
227 this(services, url, info, metaData, null, false, false, false);
228 }
229
230 public PhoenixConnection(PhoenixConnection connection,
231 ConnectionQueryServices services, Properties info)
232 throws SQLException {
233 this(services, connection.url, info, connection.metaData, null,
234 connection.isDescVarLengthRowKeyUpgrade(), connection
235 .isRunningUpgrade(), connection.buildingIndex);
236 }
237
238 private PhoenixConnection(ConnectionQueryServices services, String url,
239 Properties info, PMetaData metaData, MutationState mutationState,
240 boolean isDescVarLengthRowKeyUpgrade, boolean isRunningUpgrade,
241 boolean buildingIndex) throws SQLException {
242 GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
243 this.url = url;
244 this.isDescVarLengthRowKeyUpgrade = isDescVarLengthRowKeyUpgrade;
245
246 // Filter user provided properties based on property policy, if
247 // provided.
248 PropertyPolicyProvider.getPropertyPolicy().evaluate(info);
249
250 // Copy so client cannot change
251 this.info = info == null ? new Properties() : PropertiesUtil
252 .deepCopy(info);
253 final PName tenantId = JDBCUtil.getTenantId(url, info);
254 if (this.info.isEmpty() && tenantId == null) {
255 this.services = services;
256 } else {
257 // Create child services keyed by tenantId to track resource usage
258 // for
259 // a tenantId for all connections on this JVM.
260 if (tenantId != null) {
261 services = services.getChildQueryServices(tenantId
262 .getBytesPtr());
263 }
264 ReadOnlyProps currentProps = services.getProps();
265 final ReadOnlyProps augmentedProps = currentProps
266 .addAll(filterKnownNonProperties(this.info));
267 this.services = augmentedProps == currentProps ? services
268 : new DelegateConnectionQueryServices(services) {
269 @Override
270 public ReadOnlyProps getProps() {
271 return augmentedProps;
272 }
273 };
274 }
275
276 Long scnParam = JDBCUtil.getCurrentSCN(url, this.info);
277 checkScn(scnParam);
278 Long buildIndexAtParam = JDBCUtil.getBuildIndexSCN(url, this.info);
279 checkBuildIndexAt(buildIndexAtParam);
280 checkScnAndBuildIndexAtEquality(scnParam, buildIndexAtParam);
281
282 this.scn = scnParam != null ? scnParam : buildIndexAtParam;
283 this.buildingIndex = buildingIndex || buildIndexAtParam != null;
284 this.isAutoFlush = this.services.getProps().getBoolean(
285 QueryServices.TRANSACTIONS_ENABLED,
286 QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)
287 && this.services.getProps().getBoolean(
288 QueryServices.AUTO_FLUSH_ATTRIB,
289 QueryServicesOptions.DEFAULT_AUTO_FLUSH);
290 this.isAutoCommit = JDBCUtil.getAutoCommit(
291 url,
292 this.info,
293 this.services.getProps().getBoolean(
294 QueryServices.AUTO_COMMIT_ATTRIB,
295 QueryServicesOptions.DEFAULT_AUTO_COMMIT));
296 this.consistency = JDBCUtil.getConsistencyLevel(
297 url,
298 this.info,
299 this.services.getProps().get(QueryServices.CONSISTENCY_ATTRIB,
300 QueryServicesOptions.DEFAULT_CONSISTENCY_LEVEL));
301 // currently we are not resolving schema set through property, so if
302 // schema doesn't exists ,connection will not fail
303 // but queries may fail
304 this.schema = JDBCUtil.getSchema(
305 url,
306 this.info,
307 this.services.getProps().get(QueryServices.SCHEMA_ATTRIB,
308 QueryServicesOptions.DEFAULT_SCHEMA));
309 this.tenantId = tenantId;
310 this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info,
311 this.services.getProps());
312 this.mutateBatchSizeBytes = JDBCUtil.getMutateBatchSizeBytes(url,
313 this.info, this.services.getProps());
314 datePattern = this.services.getProps().get(
315 QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
316 timePattern = this.services.getProps().get(
317 QueryServices.TIME_FORMAT_ATTRIB, DateUtil.DEFAULT_TIME_FORMAT);
318 timestampPattern = this.services.getProps().get(
319 QueryServices.TIMESTAMP_FORMAT_ATTRIB,
320 DateUtil.DEFAULT_TIMESTAMP_FORMAT);
321 String numberPattern = this.services.getProps().get(
322 QueryServices.NUMBER_FORMAT_ATTRIB,
323 NumberUtil.DEFAULT_NUMBER_FORMAT);
324 int maxSize = this.services.getProps().getInt(
325 QueryServices.MAX_MUTATION_SIZE_ATTRIB,
326 QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
327 int maxSizeBytes = this.services.getProps().getInt(
328 QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,
329 QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
330 Format dateFormat = DateUtil.getDateFormatter(datePattern);
331 Format timeFormat = DateUtil.getDateFormatter(timePattern);
332 Format timestampFormat = DateUtil.getDateFormatter(timestampPattern);
333 formatters.put(PDate.INSTANCE, dateFormat);
334 formatters.put(PTime.INSTANCE, timeFormat);
335 formatters.put(PTimestamp.INSTANCE, timestampFormat);
336 formatters.put(PUnsignedDate.INSTANCE, dateFormat);
337 formatters.put(PUnsignedTime.INSTANCE, timeFormat);
338 formatters.put(PUnsignedTimestamp.INSTANCE, timestampFormat);
339 formatters.put(PDecimal.INSTANCE,
340 FunctionArgumentType.NUMERIC.getFormatter(numberPattern));
341 formatters.put(PVarbinary.INSTANCE, VarBinaryFormatter.INSTANCE);
342 // We do not limit the metaData on a connection less than the global
343 // one,
344 // as there's not much that will be cached here.
345 Pruner pruner = new Pruner() {
346
347 @Override
348 public boolean prune(PTable table) {
349 long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP
350 : scn;
351 return (table.getType() != PTableType.SYSTEM && (table
352 .getTimeStamp() >= maxTimestamp || (table.getTenantId() != null && !Objects
353 .equal(tenantId, table.getTenantId()))));
354 }
355
356 @Override
357 public boolean prune(PFunction function) {
358 long maxTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP
359 : scn;
360 return (function.getTimeStamp() >= maxTimestamp || (function
361 .getTenantId() != null && !Objects.equal(tenantId,
362 function.getTenantId())));
363 }
364 };
365 this.isRequestLevelMetricsEnabled = JDBCUtil
366 .isCollectingRequestLevelMetricsEnabled(url, info,
367 this.services.getProps());
368 this.mutationState = mutationState == null ? newMutationState(maxSize,
369 maxSizeBytes) : new MutationState(mutationState);
370 this.metaData = metaData;
371 this.metaData.pruneTables(pruner);
372 this.metaData.pruneFunctions(pruner);
373 this.services.addConnection(this);
374
375 // setup tracing, if its enabled
376 this.sampler = Tracing.getConfiguredSampler(this);
377 this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
378 this.scannerQueue = new LinkedBlockingQueue<>();
379 this.tableResultIteratorFactory = new DefaultTableResultIteratorFactory();
380 this.isRunningUpgrade = isRunningUpgrade;
381 GLOBAL_OPEN_PHOENIX_CONNECTIONS.increment();
382 }
383
384 private static void checkScn(Long scnParam) throws SQLException {
385 if (scnParam != null && scnParam < 0) {
386 throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_SCN)
387 .build().buildException();
388 }
389 }
390
391 private static void checkBuildIndexAt(Long replayAtParam) throws SQLException {
392 if (replayAtParam != null && replayAtParam < 0) {
393 throw new SQLExceptionInfo.Builder(
394 SQLExceptionCode.INVALID_REPLAY_AT).build()
395 .buildException();
396 }
397 }
398
399 private static void checkScnAndBuildIndexAtEquality(Long scnParam, Long replayAt)
400 throws SQLException {
401 if (scnParam != null && replayAt != null && !scnParam.equals(replayAt)) {
402 throw new SQLExceptionInfo.Builder(
403 SQLExceptionCode.UNEQUAL_SCN_AND_BUILD_INDEX_AT).build()
404 .buildException();
405 }
406 }
407
408 private static Properties filterKnownNonProperties(Properties info) {
409 Properties prunedProperties = info;
410 for (String property : PhoenixRuntime.CONNECTION_PROPERTIES) {
411 if (info.containsKey(property)) {
412 if (prunedProperties == info) {
413 prunedProperties = PropertiesUtil.deepCopy(info);
414 }
415 prunedProperties.remove(property);
416 }
417 }
418 return prunedProperties;
419 }
420
421 private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() {
422 Builder<String, String> result = ImmutableMap.builder();
423 result.putAll(JDBCUtil.getAnnotations(url, info));
424 if (getSCN() != null) {
425 result.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, getSCN().toString());
426 }
427 if (getTenantId() != null) {
428 result.put(PhoenixRuntime.TENANT_ID_ATTRIB, getTenantId()
429 .getString());
430 }
431 return result.build();
432 }
433
434 public Sampler<?> getSampler() {
435 return this.sampler;
436 }
437
438 public void setSampler(Sampler<?> sampler) throws SQLException {
439 this.sampler = sampler;
440 }
441
442 public Map<String, String> getCustomTracingAnnotations() {
443 return customTracingAnnotations;
444 }
445
446 public int executeStatements(Reader reader, List<Object> binds,
447 PrintStream out) throws IOException, SQLException {
448 int bindsOffset = 0;
449 int nStatements = 0;
450 PhoenixStatementParser parser = new PhoenixStatementParser(reader);
451 try {
452 while (true) {
453 PhoenixPreparedStatement stmt = null;
454 try {
455 stmt = new PhoenixPreparedStatement(this, parser);
456 ParameterMetaData paramMetaData = stmt
457 .getParameterMetaData();
458 for (int i = 0; i < paramMetaData.getParameterCount(); i++) {
459 stmt.setObject(i + 1, binds.get(bindsOffset + i));
460 }
461 long start = System.currentTimeMillis();
462 boolean isQuery = stmt.execute();
463 if (isQuery) {
464 ResultSet rs = stmt.getResultSet();
465 if (!rs.next()) {
466 if (out != null) {
467 out.println("no rows selected");
468 }
469 } else {
470 int columnCount = 0;
471 if (out != null) {
472 ResultSetMetaData md = rs.getMetaData();
473 columnCount = md.getColumnCount();
474 for (int i = 1; i <= columnCount; i++) {
475 int displayWidth = md
476 .getColumnDisplaySize(i);
477 String label = md.getColumnLabel(i);
478 if (md.isSigned(i)) {
479 out.print(displayWidth < label.length() ? label
480 .substring(0, displayWidth)
481 : Strings.padStart(label,
482 displayWidth, ' '));
483 out.print(' ');
484 } else {
485 out.print(displayWidth < label.length() ? label
486 .substring(0, displayWidth)
487 : Strings.padEnd(
488 md.getColumnLabel(i),
489 displayWidth, ' '));
490 out.print(' ');
491 }
492 }
493 out.println();
494 for (int i = 1; i <= columnCount; i++) {
495 int displayWidth = md
496 .getColumnDisplaySize(i);
497 out.print(Strings.padStart("",
498 displayWidth, '-'));
499 out.print(' ');
500 }
501 out.println();
502 }
503 do {
504 if (out != null) {
505 ResultSetMetaData md = rs.getMetaData();
506 for (int i = 1; i <= columnCount; i++) {
507 int displayWidth = md
508 .getColumnDisplaySize(i);
509 String value = rs.getString(i);
510 String valueString = value == null ? QueryConstants.NULL_DISPLAY_TEXT
511 : value;
512 if (md.isSigned(i)) {
513 out.print(Strings.padStart(
514 valueString, displayWidth,
515 ' '));
516 } else {
517 out.print(Strings.padEnd(
518 valueString, displayWidth,
519 ' '));
520 }
521 out.print(' ');
522 }
523 out.println();
524 }
525 } while (rs.next());
526 }
527 } else if (out != null) {
528 int updateCount = stmt.getUpdateCount();
529 if (updateCount >= 0) {
530 out.println((updateCount == 0 ? "no" : updateCount)
531 + (updateCount == 1 ? " row " : " rows ")
532 + stmt.getUpdateOperation().toString());
533 }
534 }
535 bindsOffset += paramMetaData.getParameterCount();
536 double elapsedDuration = ((System.currentTimeMillis() - start) / 1000.0);
537 out.println("Time: " + elapsedDuration + " sec(s)\n");
538 nStatements++;
539 } finally {
540 if (stmt != null) {
541 stmt.close();
542 }
543 }
544 }
545 } catch (EOFException e) {
546 }
547 return nStatements;
548 }
549
550 public @Nullable PName getTenantId() {
551 return tenantId;
552 }
553
554 public Long getSCN() {
555 return scn;
556 }
557
558 public boolean isBuildingIndex() {
559 return buildingIndex;
560 }
561
562 public int getMutateBatchSize() {
563 return mutateBatchSize;
564 }
565
566 public long getMutateBatchSizeBytes() {
567 return mutateBatchSizeBytes;
568 }
569
570 public PMetaData getMetaDataCache() {
571 return metaData;
572 }
573
574 public PTable getTable(PTableKey key) throws TableNotFoundException {
575 return metaData.getTableRef(key).getTable();
576 }
577
578 public PTableRef getTableRef(PTableKey key) throws TableNotFoundException {
579 return metaData.getTableRef(key);
580 }
581
582 protected MutationState newMutationState(int maxSize, int maxSizeBytes) {
583 return new MutationState(maxSize, maxSizeBytes, this);
584 }
585
586 public MutationState getMutationState() {
587 return mutationState;
588 }
589
590 public String getDatePattern() {
591 return datePattern;
592 }
593
594 public Format getFormatter(PDataType type) {
595 return formatters.get(type);
596 }
597
598 public String getURL() {
599 return url;
600 }
601
602 public ConnectionQueryServices getQueryServices() {
603 return services;
604 }
605
606 @Override
607 public void clearWarnings() throws SQLException {
608 }
609
610 private void closeStatements() throws SQLException {
611 List<? extends PhoenixStatement> statements = this.statements;
612 // create new list to prevent close of statements
613 // from modifying this list.
614 this.statements = Lists.newArrayList();
615 try {
616 mutationState.rollback();
617 } catch (SQLException e) {
618 // ignore any exceptions while rolling back
619 } finally {
620 try {
621 SQLCloseables.closeAll(statements);
622 } finally {
623 statements.clear();
624 }
625 }
626 }
627
628 private void checkOpen() throws SQLException {
629 if (isClosed) {
630 throw new SQLExceptionInfo.Builder(
631 SQLExceptionCode.CONNECTION_CLOSED).build()
632 .buildException();
633 }
634 }
635
636 @Override
637 public void close() throws SQLException {
638 if (isClosed) {
639 return;
640 }
641 try {
642 clearMetrics();
643 try {
644 if (traceScope != null) {
645 traceScope.close();
646 }
647 closeStatements();
648 } finally {
649 services.removeConnection(this);
650 }
651 } finally {
652 isClosed = true;
653 GLOBAL_OPEN_PHOENIX_CONNECTIONS.decrement();
654 }
655 }
656
657 @Override
658 public void commit() throws SQLException {
659 CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
660 @Override
661 public Void call() throws SQLException {
662 checkOpen();
663 mutationState.commit();
664 return null;
665 }
666 }, Tracing.withTracing(this, "committing mutations"));
667 statementExecutionCounter = 0;
668 }
669
670 @Override
671 public Array createArrayOf(String typeName, Object[] elements)
672 throws SQLException {
673 checkOpen();
674 PDataType arrayPrimitiveType = PDataType.fromSqlTypeName(typeName);
675 return PArrayDataType.instantiatePhoenixArray(arrayPrimitiveType,
676 elements);
677 }
678
679 @Override
680 public Blob createBlob() throws SQLException {
681 throw new SQLFeatureNotSupportedException();
682 }
683
684 @Override
685 public Clob createClob() throws SQLException {
686 throw new SQLFeatureNotSupportedException();
687 }
688
689 @Override
690 public NClob createNClob() throws SQLException {
691 throw new SQLFeatureNotSupportedException();
692 }
693
694 @Override
695 public SQLXML createSQLXML() throws SQLException {
696 throw new SQLFeatureNotSupportedException();
697 }
698
699 public List<PhoenixStatement> getStatements() {
700 return statements;
701 }
702
703 @Override
704 public Statement createStatement() throws SQLException {
705 checkOpen();
706 PhoenixStatement statement = new PhoenixStatement(this);
707 statements.add(statement);
708 return statement;
709 }
710
711 /**
712 * Back-door way to inject processing into walking through a result set
713 *
714 * @param statementFactory
715 * @return PhoenixStatement
716 * @throws SQLException
717 */
718 public PhoenixStatement createStatement(
719 PhoenixStatementFactory statementFactory) throws SQLException {
720 PhoenixStatement statement = statementFactory.newStatement(this);
721 statements.add(statement);
722 return statement;
723 }
724
725 @Override
726 public Statement createStatement(int resultSetType, int resultSetConcurrency)
727 throws SQLException {
728 checkOpen();
729 if (resultSetType != ResultSet.TYPE_FORWARD_ONLY
730 || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) {
731 throw new SQLFeatureNotSupportedException();
732 }
733 return createStatement();
734 }
735
736 @Override
737 public Statement createStatement(int resultSetType,
738 int resultSetConcurrency, int resultSetHoldability)
739 throws SQLException {
740 checkOpen();
741 if (resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) {
742 throw new SQLFeatureNotSupportedException();
743 }
744 return createStatement(resultSetType, resultSetConcurrency);
745 }
746
747 @Override
748 public Struct createStruct(String typeName, Object[] attributes)
749 throws SQLException {
750 throw new SQLFeatureNotSupportedException();
751 }
752
753 @Override
754 public boolean getAutoCommit() throws SQLException {
755 return isAutoCommit;
756 }
757
758 public boolean getAutoFlush() {
759 return isAutoFlush;
760 }
761
762 public void setAutoFlush(boolean autoFlush) throws SQLException {
763 if (autoFlush
764 && !this.services.getProps().getBoolean(
765 QueryServices.TRANSACTIONS_ENABLED,
766 QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
767 throw new SQLExceptionInfo.Builder(
768 SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH)
769 .build().buildException();
770 }
771 this.isAutoFlush = autoFlush;
772 }
773
774 public void flush() throws SQLException {
775 mutationState.sendUncommitted();
776 }
777
778 public void setTransactionContext(PhoenixTransactionContext txContext)
779 throws SQLException {
780 if (!this.services.getProps().getBoolean(
781 QueryServices.TRANSACTIONS_ENABLED,
782 QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
783 throw new SQLExceptionInfo.Builder(
784 SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT)
785 .build().buildException();
786 }
787 this.mutationState.rollback();
788 this.mutationState = new MutationState(this.mutationState.getMaxSize(),
789 this.mutationState.getMaxSizeBytes(), this, txContext);
790
791 // Write data to HBase after each statement execution as the commit may
792 // not
793 // come through Phoenix APIs.
794 setAutoFlush(true);
795 }
796
797 public Consistency getConsistency() {
798 return this.consistency;
799 }
800
801 @Override
802 public String getCatalog() throws SQLException {
803 return tenantId == null ? "" : tenantId.getString();
804 }
805
806 @Override
807 public Properties getClientInfo() throws SQLException {
808 // Defensive copy so client cannot change
809 return new Properties(info);
810 }
811
812 @Override
813 public String getClientInfo(String name) {
814 return info.getProperty(name);
815 }
816
817 @Override
818 public int getHoldability() throws SQLException {
819 return ResultSet.CLOSE_CURSORS_AT_COMMIT;
820 }
821
822 @Override
823 public DatabaseMetaData getMetaData() throws SQLException {
824 checkOpen();
825 return new PhoenixDatabaseMetaData(this);
826 }
827
828 @Override
829 public int getTransactionIsolation() throws SQLException {
830 boolean transactionsEnabled = getQueryServices().getProps().getBoolean(
831 QueryServices.TRANSACTIONS_ENABLED,
832 QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
833 return transactionsEnabled ? Connection.TRANSACTION_REPEATABLE_READ
834 : Connection.TRANSACTION_READ_COMMITTED;
835 }
836
837 @Override
838 public Map<String, Class<?>> getTypeMap() throws SQLException {
839 return Collections.emptyMap();
840 }
841
842 @Override
843 public SQLWarning getWarnings() throws SQLException {
844 return null;
845 }
846
847 @Override
848 public boolean isClosed() throws SQLException {
849 return isClosed;
850 }
851
852 @Override
853 public boolean isReadOnly() throws SQLException {
854 return readOnly || (scn != null && !buildingIndex && !isRunningUpgrade);
855 }
856
857 @Override
858 public boolean isValid(int timeout) throws SQLException {
859 // TODO: run query here or ping
860 return !isClosed;
861 }
862
863 @Override
864 public String nativeSQL(String sql) throws SQLException {
865 throw new SQLFeatureNotSupportedException();
866 }
867
868 @Override
869 public CallableStatement prepareCall(String sql) throws SQLException {
870 throw new SQLFeatureNotSupportedException();
871 }
872
873 @Override
874 public CallableStatement prepareCall(String sql, int resultSetType,
875 int resultSetConcurrency) throws SQLException {
876 throw new SQLFeatureNotSupportedException();
877 }
878
879 @Override
880 public CallableStatement prepareCall(String sql, int resultSetType,
881 int resultSetConcurrency, int resultSetHoldability)
882 throws SQLException {
883 throw new SQLFeatureNotSupportedException();
884 }
885
886 @Override
887 public PreparedStatement prepareStatement(String sql) throws SQLException {
888 checkOpen();
889 PhoenixPreparedStatement statement = new PhoenixPreparedStatement(this,
890 sql);
891 statements.add(statement);
892 return statement;
893 }
894
895 @Override
896 public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys)
897 throws SQLException {
898 checkOpen();
899 // Ignore autoGeneratedKeys, and just execute the statement.
900 return prepareStatement(sql);
901 }
902
903 @Override
904 public PreparedStatement prepareStatement(String sql, int[] columnIndexes)
905 throws SQLException {
906 checkOpen();
907 // Ignore columnIndexes, and just execute the statement.
908 return prepareStatement(sql);
909 }
910
911 @Override
912 public PreparedStatement prepareStatement(String sql, String[] columnNames)
913 throws SQLException {
914 checkOpen();
915 // Ignore columnNames, and just execute the statement.
916 return prepareStatement(sql);
917 }
918
919 @Override
920 public PreparedStatement prepareStatement(String sql, int resultSetType,
921 int resultSetConcurrency) throws SQLException {
922 checkOpen();
923 if (resultSetType != ResultSet.TYPE_FORWARD_ONLY
924 || resultSetConcurrency != ResultSet.CONCUR_READ_ONLY) {
925 throw new SQLFeatureNotSupportedException();
926 }
927 return prepareStatement(sql);
928 }
929
930 @Override
931 public PreparedStatement prepareStatement(String sql, int resultSetType,
932 int resultSetConcurrency, int resultSetHoldability)
933 throws SQLException {
934 checkOpen();
935 if (resultSetHoldability != ResultSet.CLOSE_CURSORS_AT_COMMIT) {
936 throw new SQLFeatureNotSupportedException();
937 }
938 return prepareStatement(sql, resultSetType, resultSetConcurrency);
939 }
940
941 @Override
942 public void releaseSavepoint(Savepoint savepoint) throws SQLException {
943 throw new SQLFeatureNotSupportedException();
944 }
945
946 @Override
947 public void rollback() throws SQLException {
948 CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
949 @Override
950 public Void call() throws SQLException {
951 checkOpen();
952 mutationState.rollback();
953 return null;
954 }
955 }, Tracing.withTracing(this, "rolling back"));
956 statementExecutionCounter = 0;
957 }
958
959 @Override
960 public void rollback(Savepoint savepoint) throws SQLException {
961 throw new SQLFeatureNotSupportedException();
962 }
963
964 @Override
965 public void setAutoCommit(boolean isAutoCommit) throws SQLException {
966 checkOpen();
967 this.isAutoCommit = isAutoCommit;
968 }
969
970 public void setConsistency(Consistency val) {
971 this.consistency = val;
972 }
973
974 @Override
975 public void setCatalog(String catalog) throws SQLException {
976 checkOpen();
977 if (!this.getCatalog().equalsIgnoreCase(catalog)) {
978 // allow noop calls to pass through.
979 throw new SQLFeatureNotSupportedException();
980 }
981 // TODO:
982 // if (catalog == null) {
983 // tenantId = null;
984 // } else {
985 // tenantId = PNameFactory.newName(catalog);
986 // }
987 }
988
989 @Override
990 public void setClientInfo(Properties properties)
991 throws SQLClientInfoException {
992 throw new UnsupportedOperationException();
993 }
994
995 @Override
996 public void setClientInfo(String name, String value)
997 throws SQLClientInfoException {
998 throw new UnsupportedOperationException();
999 }
1000
1001 @Override
1002 public void setHoldability(int holdability) throws SQLException {
1003 throw new SQLFeatureNotSupportedException();
1004 }
1005
1006 @Override
1007 public void setReadOnly(boolean readOnly) throws SQLException {
1008 checkOpen();
1009 this.readOnly = readOnly;
1010 }
1011
1012 @Override
1013 public Savepoint setSavepoint() throws SQLException {
1014 throw new SQLFeatureNotSupportedException();
1015 }
1016
1017 @Override
1018 public Savepoint setSavepoint(String name) throws SQLException {
1019 throw new SQLFeatureNotSupportedException();
1020 }
1021
1022 @Override
1023 public void setTransactionIsolation(int level) throws SQLException {
1024 checkOpen();
1025 boolean transactionsEnabled = getQueryServices().getProps().getBoolean(
1026 QueryServices.TRANSACTIONS_ENABLED,
1027 QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED);
1028 if (level == Connection.TRANSACTION_SERIALIZABLE) {
1029 throw new SQLFeatureNotSupportedException();
1030 }
1031 if (!transactionsEnabled
1032 && level == Connection.TRANSACTION_REPEATABLE_READ) {
1033 throw new SQLExceptionInfo.Builder(
1034 SQLExceptionCode.TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL)
1035 .build().buildException();
1036 }
1037 }
1038
1039 @Override
1040 public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
1041 throw new SQLFeatureNotSupportedException();
1042 }
1043
1044 @Override
1045 public boolean isWrapperFor(Class<?> iface) throws SQLException {
1046 return iface.isInstance(this);
1047 }
1048
1049 @SuppressWarnings("unchecked")
1050 @Override
1051 public <T> T unwrap(Class<T> iface) throws SQLException {
1052 if (!iface.isInstance(this)) {
1053 throw new SQLExceptionInfo.Builder(
1054 SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
1055 .setMessage(
1056 this.getClass().getName()
1057 + " not unwrappable from "
1058 + iface.getName()).build().buildException();
1059 }
1060 return (T) this;
1061 }
1062
1063 @Override
1064 public void setSchema(String schema) throws SQLException {
1065 checkOpen();
1066 this.schema = schema;
1067 }
1068
1069 @Override
1070 public String getSchema() throws SQLException {
1071 return SchemaUtil.normalizeIdentifier(this.schema);
1072 }
1073
1074 public PSchema getSchema(PTableKey key) throws SchemaNotFoundException {
1075 return metaData.getSchema(key);
1076 }
1077
1078 @Override
1079 public void abort(Executor executor) throws SQLException {
1080 checkOpen();
1081 }
1082
1083 @Override
1084 public void setNetworkTimeout(Executor executor, int milliseconds)
1085 throws SQLException {
1086 checkOpen();
1087 }
1088
1089 @Override
1090 public int getNetworkTimeout() throws SQLException {
1091 // TODO Auto-generated method stub
1092 return 0;
1093 }
1094
1095 @Override
1096 public void addTable(PTable table, long resolvedTime) throws SQLException {
1097 metaData.addTable(table, resolvedTime);
1098 // Cascade through to connectionQueryServices too
1099 getQueryServices().addTable(table, resolvedTime);
1100 }
1101
1102 @Override
1103 public void updateResolvedTimestamp(PTable table, long resolvedTime)
1104 throws SQLException {
1105 metaData.updateResolvedTimestamp(table, resolvedTime);
1106 // Cascade through to connectionQueryServices too
1107 getQueryServices().updateResolvedTimestamp(table, resolvedTime);
1108 }
1109
1110 @Override
1111 public void addFunction(PFunction function) throws SQLException {
1112 // TODO: since a connection is only used by one thread at a time,
1113 // we could modify this metadata in place since it's not shared.
1114 if (scn == null || scn > function.getTimeStamp()) {
1115 metaData.addFunction(function);
1116 }
1117 // Cascade through to connectionQueryServices too
1118 getQueryServices().addFunction(function);
1119 }
1120
1121 @Override
1122 public void addSchema(PSchema schema) throws SQLException {
1123 metaData.addSchema(schema);
1124 // Cascade through to connectionQueryServices too
1125 getQueryServices().addSchema(schema);
1126 }
1127
1128 @Override
1129 public void removeTable(PName tenantId, String tableName,
1130 String parentTableName, long tableTimeStamp) throws SQLException {
1131 metaData.removeTable(tenantId, tableName, parentTableName,
1132 tableTimeStamp);
1133 // Cascade through to connectionQueryServices too
1134 getQueryServices().removeTable(tenantId, tableName, parentTableName,
1135 tableTimeStamp);
1136 }
1137
1138 @Override
1139 public void removeFunction(PName tenantId, String functionName,
1140 long tableTimeStamp) throws SQLException {
1141 metaData.removeFunction(tenantId, functionName, tableTimeStamp);
1142 // Cascade through to connectionQueryServices too
1143 getQueryServices().removeFunction(tenantId, functionName,
1144 tableTimeStamp);
1145 }
1146
1147 @Override
1148 public void removeColumn(PName tenantId, String tableName,
1149 List<PColumn> columnsToRemove, long tableTimeStamp,
1150 long tableSeqNum, long resolvedTime) throws SQLException {
1151 metaData.removeColumn(tenantId, tableName, columnsToRemove,
1152 tableTimeStamp, tableSeqNum, resolvedTime);
1153 // Cascade through to connectionQueryServices too
1154 getQueryServices().removeColumn(tenantId, tableName, columnsToRemove,
1155 tableTimeStamp, tableSeqNum, resolvedTime);
1156 }
1157
1158 protected boolean removeStatement(PhoenixStatement statement)
1159 throws SQLException {
1160 return statements.remove(statement);
1161 }
1162
1163 public KeyValueBuilder getKeyValueBuilder() {
1164 return this.services.getKeyValueBuilder();
1165 }
1166
1167 /**
1168 * Used to track executions of {@link Statement}s and
1169 * {@link PreparedStatement}s that were created from this connection before
1170 * commit or rollback. 0-based. Used to associate partial save errors with
1171 * SQL statements invoked by users.
1172 *
1173 * @see CommitException
1174 * @see #incrementStatementExecutionCounter()
1175 */
1176 public int getStatementExecutionCounter() {
1177 return statementExecutionCounter;
1178 }
1179
1180 public void incrementStatementExecutionCounter() {
1181 statementExecutionCounter++;
1182 }
1183
1184 public TraceScope getTraceScope() {
1185 return traceScope;
1186 }
1187
1188 public void setTraceScope(TraceScope traceScope) {
1189 this.traceScope = traceScope;
1190 }
1191
1192 public Map<String, Map<MetricType, Long>> getMutationMetrics() {
1193 return mutationState.getMutationMetricQueue().aggregate();
1194 }
1195
1196 public Map<String, Map<MetricType, Long>> getReadMetrics() {
1197 return mutationState.getReadMetricQueue() != null ? mutationState
1198 .getReadMetricQueue().aggregate() : Collections
1199 .<String, Map<MetricType, Long>> emptyMap();
1200 }
1201
1202 public boolean isRequestLevelMetricsEnabled() {
1203 return isRequestLevelMetricsEnabled;
1204 }
1205
1206 public void clearMetrics() {
1207 mutationState.getMutationMetricQueue().clearMetrics();
1208 if (mutationState.getReadMetricQueue() != null) {
1209 mutationState.getReadMetricQueue().clearMetrics();
1210 }
1211 }
1212
1213 /**
1214 * Returns true if this connection is being used to upgrade the data due to
1215 * PHOENIX-2067 and false otherwise.
1216 *
1217 * @return
1218 */
1219 public boolean isDescVarLengthRowKeyUpgrade() {
1220 return isDescVarLengthRowKeyUpgrade;
1221 }
1222
1223 /**
1224 * Added for tests only. Do not use this elsewhere.
1225 */
1226 public ParallelIteratorFactory getIteratorFactory() {
1227 return parallelIteratorFactory;
1228 }
1229
1230 /**
1231 * Added for testing purposes. Do not use this elsewhere.
1232 */
1233 public void setIteratorFactory(
1234 ParallelIteratorFactory parallelIteratorFactory) {
1235 this.parallelIteratorFactory = parallelIteratorFactory;
1236 }
1237
1238 public void addIteratorForLeaseRenewal(@Nonnull TableResultIterator itr) {
1239 if (services.isRenewingLeasesEnabled()) {
1240 checkNotNull(itr);
1241 scannerQueue.add(new WeakReference<TableResultIterator>(itr));
1242 }
1243 }
1244
1245 public LinkedBlockingQueue<WeakReference<TableResultIterator>> getScanners() {
1246 return scannerQueue;
1247 }
1248
1249 @VisibleForTesting
1250 @Nonnull
1251 public TableResultIteratorFactory getTableResultIteratorFactory() {
1252 return tableResultIteratorFactory;
1253 }
1254
1255 @VisibleForTesting
1256 public void setTableResultIteratorFactory(TableResultIteratorFactory factory) {
1257 checkNotNull(factory);
1258 this.tableResultIteratorFactory = factory;
1259 }
1260
1261 @Override
1262 public void removeSchema(PSchema schema, long schemaTimeStamp) {
1263 metaData.removeSchema(schema, schemaTimeStamp);
1264 // Cascade through to connectionQueryServices too
1265 getQueryServices().removeSchema(schema, schemaTimeStamp);
1266
1267 }
1268
1269 public boolean isRunningUpgrade() {
1270 return isRunningUpgrade;
1271 }
1272
1273 public void setRunningUpgrade(boolean isRunningUpgrade) {
1274 this.isRunningUpgrade = isRunningUpgrade;
1275 }
1276
1277 }