# File lib/mongo/collection_writer.rb, line 32 def initialize(collection) @collection = collection @name = @collection.name @db = @collection.db @connection = @db.connection @logger = @connection.logger @max_write_batch_size = Mongo::MongoClient::DEFAULT_MAX_WRITE_BATCH_SIZE end
common implementation only for new batch write commands (insert, update, delete) and old batch insert
# File lib/mongo/collection_writer.rb, line 42 def batch_write_incremental(op_type, documents, check_keys=true, opts={}) raise Mongo::OperationFailure, "Request contains no documents" if documents.empty? write_concern = get_write_concern(opts, @collection) max_message_size, max_append_size, max_serialize_size = batch_write_max_sizes(write_concern) ordered = opts[:ordered] continue_on_error = !!opts[:continue_on_error] || ordered == false collect_on_error = !!opts[:collect_on_error] || ordered == false error_docs = [] # docs with serialization errors errors = [] write_concern_errors = [] exchanges = [] serialized_doc = nil message = BSON::ByteBuffer.new("", max_message_size) @max_write_batch_size = @collection.db.connection.max_write_batch_size docs = documents.dup catch(:error) do until docs.empty? || (!errors.empty? && !collect_on_error) # process documents a batch at a time batch_docs = [] batch_message_initialize(message, op_type, continue_on_error, write_concern) while !docs.empty? && batch_docs.size < @max_write_batch_size begin doc = docs.first doc = doc[:d] if op_type == :insert && !ordered.nil? #check_keys for :update outside of serialize serialized_doc ||= BSON::BSON_CODER.serialize(doc, check_keys, true, max_serialize_size) rescue BSON::InvalidDocument, BSON::InvalidKeyName, BSON::InvalidStringEncoding => ex bulk_message = "Bulk write error - #{ex.message} - examine result for complete information" ex = BulkWriteError.new(bulk_message, Mongo::ErrorCode::INVALID_BSON, {:op_type => op_type, :serialize => doc, :ord => docs.first[:ord], :error => ex}) unless ordered.nil? error_docs << docs.shift errors << ex next if collect_on_error throw(:error) if batch_docs.empty? break # defer exit and send batch end break if message.size + serialized_doc.size > max_append_size batch_docs << docs.shift batch_message_append(message, serialized_doc, write_concern) serialized_doc = nil end begin response = batch_message_send(message, op_type, batch_docs, write_concern, continue_on_error) if batch_docs.size > 0 exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => response} rescue Mongo::WriteConcernError => ex write_concern_errors << ex exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => ex.result} rescue Mongo::OperationFailure => ex errors << ex exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => ex.result} throw(:error) unless continue_on_error end end end [error_docs, errors, write_concern_errors, exchanges] end
# File lib/mongo/collection_writer.rb, line 97 def batch_write_partition(op_type, documents, check_keys, opts) raise Mongo::OperationFailure, "Request contains no documents" if documents.empty? write_concern = get_write_concern(opts, @collection) ordered = opts[:ordered] continue_on_error = !!opts[:continue_on_error] || ordered == false # continue_on_error default false collect_on_error = !!opts[:collect_on_error] # collect_on_error default false error_docs = [] # docs with serialization errors errors = [] write_concern_errors = [] exchanges = [] @max_write_batch_size = @collection.db.connection.max_write_batch_size @write_batch_size = [documents.size, @max_write_batch_size].min docs = documents.dup until docs.empty? batch = docs.take(@write_batch_size) begin batch_to_send = batch #(op_type == :insert && !ordered.nil?) ? batch.collect{|doc|doc[:d]} : batch if @collection.db.connection.use_write_command?(write_concern) # TODO - polymorphic send_write including legacy insert response = send_bulk_write_command(op_type, batch_to_send, check_keys, opts) else response = send_write_operation(op_type, nil, batch_to_send, check_keys, opts, write_concern) end exchanges << {:op_type => op_type, :batch => batch, :opts => opts, :response => response} docs = docs.drop(batch.size) @write_batch_size = [(@write_batch_size*1097) >> 10, @write_batch_size+1].max unless docs.empty? # 2**(1/10) multiplicative increase @write_batch_size = @max_write_batch_size if @write_batch_size > @max_write_batch_size rescue BSON::InvalidDocument, BSON::InvalidKeyName, BSON::InvalidStringEncoding => ex if @write_batch_size > 1 # decrease batch size @write_batch_size = (@write_batch_size+1) >> 1 # 2**(-1) multiplicative decrease next end # error on a single document bulk_message = "Bulk write error - #{ex.message} - examine result for complete information" ex = BulkWriteError.new(bulk_message, Mongo::ErrorCode::INVALID_BSON, {:op_type => op_type, :batch => batch, :ord => batch.first[:ord], :opts => opts, :error => ex}) unless ordered.nil? error_docs << docs.shift next if collect_on_error errors << ex break unless continue_on_error rescue Mongo::WriteConcernError => ex write_concern_errors << ex exchanges << {:op_type => op_type, :batch => batch_docs, :opts => opts, :response => ex.result} docs = docs.drop(batch.size) rescue Mongo::OperationFailure => ex errors << ex exchanges << {:op_type => op_type, :batch => batch, :opts => opts, :response => ex.result} docs = docs.drop(batch.size) break if !continue_on_error && !collect_on_error end end [error_docs, errors, write_concern_errors, exchanges] end
# File lib/mongo/collection_writer.rb, line 152 def send_bulk_write_command(op_type, documents, check_keys, opts, collection_name=@name) if op_type == :insert documents = documents.collect{|doc| doc[:d]} if opts.key?(:ordered) documents.each do |doc| # TODO - @pk_factory.create_pk(doc) if check_keys doc.each_key do |key| key = key.to_s raise BSON::InvalidKeyName.new("key #{key} must not start with '$'") if key[0] == $$ raise BSON::InvalidKeyName.new("key #{key} must not contain '.'") if key.include? .. end end end #elsif op_type == :update # TODO - check keys #elsif op_type == :delete #else # raise ArgumentError, "Write operation type must be :insert, :update or :delete" end request = BSON::OrderedHash[op_type, collection_name].merge!( Mongo::CollectionWriter::WRITE_COMMAND_ARG_KEY[op_type] => documents, :writeConcern => get_write_concern(opts, @collection), :ordered => opts[:ordered] || !opts[:continue_on_error] ) @db.command(request) end
Generated with the Darkfish Rdoc Generator 2.